1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(TNS, sorting, distinct): TNS now accounts data used by RGDatas and distinct maps.

This commit is contained in:
drrtuy
2025-03-11 19:21:44 +00:00
parent c6dabe7eb5
commit 09926b3157
6 changed files with 87 additions and 26 deletions

View File

@ -54,7 +54,7 @@ void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool
{
fRm = jobInfo.rm;
fSessionMemLimit = jobInfo.umMemLimit;
fErrorCode = ERR_LIMIT_TOO_BIG;
fErrorCode = ERR_ORDERBY_TOO_BIG;
// locate column position in the rowgroup
map<uint32_t, uint32_t> keyToIndexMap;

View File

@ -22,7 +22,7 @@
#include <cassert>
#include <sstream>
#include <iomanip>
#include <tr1/unordered_set>
#include <unordered_set>
using namespace std;
#include <boost/shared_ptr.hpp>
@ -83,8 +83,7 @@ struct TAEq
bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const;
};
// TODO: Generalize these and put them back in utils/common/hasher.h
typedef tr1::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq, STLPoolAllocator<rowgroup::Row::Pointer> >
DistinctMap_t;
using TNSDistinctMap_t = std::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq, allocators::CountingAllocator<rowgroup::Row::Pointer> >;
}; // namespace
inline uint64_t TAHasher::operator()(const Row::Pointer& p) const
@ -462,7 +461,6 @@ void TupleAnnexStep::executeNoOrderBy()
void TupleAnnexStep::executeNoOrderByWithDistinct()
{
utils::setThreadName("TNSwoOrdDist");
scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
vector<RGData> dataVec;
vector<RGData> dataVecSkip;
RGData rgDataIn;
@ -472,6 +470,9 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
Row rowSkip;
bool more = false;
auto alloc = fRm->getAllocator<rowgroup::Row::Pointer>();
std::unique_ptr<TNSDistinctMap_t> distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), alloc));
rgDataOut.reinit(fRowGroupOut);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
@ -508,7 +509,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
{
pair<DistinctMap_t::iterator, bool> inserted;
pair<TNSDistinctMap_t::iterator, bool> inserted;
Row* rowPtr;
if (distinctMap->size() < fLimitStart)
@ -544,6 +545,8 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
// allocate new RGData for skipped rows below the fLimitStart
// offset (do not take it into account in RM assuming there
// are few skipped rows
checkAndAllocateMemory4RGData(rowGroupSkip);
dataVecSkip.push_back(rgDataSkip);
rgDataSkip.reinit(rowGroupSkip);
rowGroupSkip.setData(&rgDataSkip);
@ -560,6 +563,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize))
{
checkAndAllocateMemory4RGData(fRowGroupOut);
dataVec.push_back(rgDataOut);
rgDataOut.reinit(fRowGroupOut);
fRowGroupOut.setData(&rgDataOut);
@ -572,6 +576,9 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
more = fInputDL->next(fInputIterator, &rgDataIn);
}
// to reduce memory consumption
dataVecSkip.clear();
if (fRowGroupOut.getRowCount() > 0)
dataVec.push_back(rgDataOut);
@ -581,6 +588,14 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
fRowGroupOut.setData(&rgDataOut);
fOutputDL->insert(rgDataOut);
}
while (!dataVec.empty())
{
auto& rgData = dataVec.back();
fRowGroupOut.setData(&rgData);
fRm->returnMemory(fRowGroupOut.getSizeWithStrings() - fRowGroupOut.getHeaderSize());
fOutputDL->insert(rgData);
dataVec.pop_back();
}
}
catch (...)
{
@ -595,6 +610,16 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
fOutputDL->endOfInput();
}
void TupleAnnexStep::checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup)
{
uint64_t size = rowGroup.getSizeWithStrings() - rowGroup.getHeaderSize();
if (!fRm->getMemory(size, false))
{
cerr << IDBErrorInfo::instance()->errorMsg(ERR_TNS_DISTINCT_IS_TOO_BIG) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(ERR_TNS_DISTINCT_IS_TOO_BIG);
}
}
void TupleAnnexStep::executeWithOrderBy()
{
utils::setThreadName("TNSwOrd");
@ -669,6 +694,10 @@ void TupleAnnexStep::executeWithOrderBy()
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
// release RGData memory
size_t rgDataSize = fRowGroupOut.getSizeWithStrings() - fRowGroupOut.getHeaderSize();
fOrderBy->returnRGDataMemory2RM(rgDataSize);
}
}
}
@ -712,9 +741,10 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
// Calculate offset here
fRowGroupOut.getRow(0, &fRowOut);
auto alloc = fRm->getAllocator<ordering::OrderByRow>();
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc);
scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
auto allocSorting = fRm->getAllocator<ordering::OrderByRow>();
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, allocSorting);
auto allocDistinct = fRm->getAllocator<rowgroup::Row::Pointer>();
std::unique_ptr<TNSDistinctMap_t> distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), allocDistinct));
fRowGroupIn.initRow(&row1);
fRowGroupIn.initRow(&row2);
@ -731,7 +761,7 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
fOrderByList[id]->getRule().revertRules();
ordering::SortingPQ& currentPQ = fOrderByList[id]->getQueue();
finalPQ.reserve(finalPQ.size() + currentPQ.size());
pair<DistinctMap_t::iterator, bool> inserted;
pair<TNSDistinctMap_t::iterator, bool> inserted;
while (currentPQ.size())
{
ordering::OrderByRow& topOBRow = const_cast<ordering::OrderByRow&>(currentPQ.top());
@ -868,14 +898,6 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
fOutputDL->endOfInput();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
if (dlTimes.FirstReadTime().tv_sec == 0)
@ -885,6 +907,20 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
dlTimes.setEndOfInputTime();
printCalTrace();
}
// Release memory before ctor
for (uint64_t id = 1; id <= fMaxThreads; id++)
{
fOrderByList[id]->returnAllRGDataMemory2RM();
}
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
}
/*
@ -1056,6 +1092,19 @@ void TupleAnnexStep::finalizeParallelOrderBy()
fOutputDL->endOfInput();
if (traceOn())
{
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
printCalTrace();
}
// Release memory before ctor
for (uint64_t id = 1; id <= fMaxThreads; id++)
{
fOrderByList[id]->returnAllRGDataMemory2RM();
}
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
@ -1063,13 +1112,6 @@ void TupleAnnexStep::finalizeParallelOrderBy()
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
printCalTrace();
}
}
void TupleAnnexStep::executeParallelOrderBy(uint64_t id)

View File

@ -109,6 +109,7 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep
void executeWithOrderBy();
void executeParallelOrderBy(uint64_t id);
void executeNoOrderByWithDistinct();
void checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup);
void formatMiniStats();
void printCalTrace();
void finalizeParallelOrderBy();

View File

@ -110,6 +110,8 @@
2061 ERR_NOT_SUPPORTED_GROUPBY_ORDERBY_EXPRESSION %1% is not in GROUP BY clause, not a column or an expression that contains function.
2063 ERR_TNS_DISTINCT_IS_TOO_BIG DISTINCT memory limit is exceeded whilst running TNS step.
# Sub-query errors
3001 ERR_NON_SUPPORT_SUB_QUERY_TYPE This subquery type is not supported yet.
3002 ERR_MORE_THAN_1_ROW Subquery returns more than 1 row.

View File

@ -737,7 +737,8 @@ IdbOrderBy::IdbOrderBy()
IdbOrderBy::~IdbOrderBy()
{
if (fRm)
// returnRGDataMemory2RM() returns all memory before the dtor is called.
if (fRm && fMemSize > 0)
fRm->returnMemory(fMemSize, fSessionMemLimit);
// delete compare objects

View File

@ -33,6 +33,7 @@
#include <unordered_set>
#include "countingallocator.h"
#include "resourcemanager.h"
#include "rowgroup.h"
#include "hasher.h"
// #include "stlpoolallocator.h"
@ -433,6 +434,20 @@ class IdbOrderBy : public IdbCompare
{
return *fOrderByQueue;
}
void returnAllRGDataMemory2RM()
{
while (!fOrderByQueue->empty())
{
fOrderByQueue->pop();
}
fRm->returnMemory(fMemSize, fSessionMemLimit);
fMemSize = 0;
}
void returnRGDataMemory2RM(const size_t rgDataSize)
{
fRm->returnMemory(rgDataSize, fSessionMemLimit);
fMemSize -= rgDataSize;
}
CompareRule& getRule()
{
return fRule;