1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

feat(TNS): distribute SortingPQ that supports CountingAllocator

This commit is contained in:
drrtuy
2025-03-10 19:10:45 +00:00
parent be5711cf0d
commit c6dabe7eb5
7 changed files with 93 additions and 73 deletions

View File

@ -768,6 +768,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
if (concatColIsNull(row))
return;
auto& orderByQueue = getQueue();
// if the row count is less than the limit
if (fCurrentLength < fGroupConcatLen)
{
@ -776,7 +777,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
OrderByRow newRow(fRow0, fRule);
fOrderByQueue.push(newRow);
orderByQueue.push(newRow);
fCurrentLength += estLen;
// add to the distinct map
@ -806,11 +807,11 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
}
}
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
fRow1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= fRow1.getRelRid();
fRow2.setData(swapRow.fData);
@ -830,7 +831,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
fRow2.setRid(estLen);
fCurrentLength += estLen;
fOrderByQueue.push(swapRow);
orderByQueue.push(swapRow);
}
}
@ -838,9 +839,12 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
{
GroupConcatOrderBy* go = dynamic_cast<GroupConcatOrderBy*>(gc);
while (go->fOrderByQueue.empty() == false)
auto& orderByQueue = getQueue();
auto mergeQueue = go->getQueue();
while (mergeQueue.empty() == false)
{
const OrderByRow& row = go->fOrderByQueue.top();
const OrderByRow& row = mergeQueue.top();
// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
@ -851,7 +855,7 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
// if the row count is less than the limit
else if (fCurrentLength < fGroupConcatLen)
{
fOrderByQueue.push(row);
orderByQueue.push(row);
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();
@ -860,11 +864,11 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
fDistinctMap->insert(row.fData);
}
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
row1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= row1.getRelRid();
if (fDistinct)
@ -876,10 +880,10 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();
fOrderByQueue.push(row);
orderByQueue.push(row);
}
go->fOrderByQueue.pop();
mergeQueue.pop();
}
}
@ -890,10 +894,12 @@ uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep)
// need to reverse the order
stack<OrderByRow> rowStack;
while (fOrderByQueue.size() > 0)
auto& orderByQueue = getQueue();
while (orderByQueue.size() > 0)
{
rowStack.push(fOrderByQueue.top());
fOrderByQueue.pop();
rowStack.push(orderByQueue.top());
orderByQueue.pop();
}
size_t prevResultSize = 0;

View File

@ -762,6 +762,8 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
if (concatColIsNull(row))
return;
auto& orderByQueue = getQueue();
// if the row count is less than the limit
if (fCurrentLength < fGroupConcatLen)
{
@ -770,7 +772,7 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
OrderByRow newRow(fRow0, fRule);
fOrderByQueue.push(newRow);
orderByQueue.push(newRow);
fCurrentLength += estLen;
// add to the distinct map
@ -800,11 +802,11 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
}
}
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
fRow1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= fRow1.getRelRid();
fRow2.setData(swapRow.fData);
@ -824,7 +826,7 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
fRow2.setRid(estLen);
fCurrentLength += estLen;
fOrderByQueue.push(swapRow);
orderByQueue.push(swapRow);
}
}
@ -832,9 +834,12 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
{
JsonArrayAggOrderBy* go = dynamic_cast<JsonArrayAggOrderBy*>(gc);
while (go->fOrderByQueue.empty() == false)
auto& orderByQueue = getQueue();
auto mergeQueue = go->getQueue();
while (mergeQueue.empty() == false)
{
const OrderByRow& row = go->fOrderByQueue.top();
const OrderByRow& row = mergeQueue.top();
// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
@ -845,7 +850,7 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
// if the row count is less than the limit
else if (fCurrentLength < fGroupConcatLen)
{
fOrderByQueue.push(row);
orderByQueue.push(row);
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();
@ -854,11 +859,11 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
fDistinctMap->insert(row.fData);
}
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
row1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= row1.getRelRid();
if (fDistinct)
@ -870,10 +875,10 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();
fOrderByQueue.push(row);
orderByQueue.push(row);
}
go->fOrderByQueue.pop();
mergeQueue.pop();
}
}
@ -884,11 +889,12 @@ uint8_t* JsonArrayAggOrderBy::getResultImpl(const string&)
// need to reverse the order
stack<OrderByRow> rowStack;
auto& orderByQueue = getQueue();
while (fOrderByQueue.size() > 0)
while (orderByQueue.size() > 0)
{
rowStack.push(fOrderByQueue.top());
fOrderByQueue.pop();
rowStack.push(orderByQueue.top());
orderByQueue.pop();
}
if (rowStack.size() > 0)
{

View File

@ -109,12 +109,13 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
if (fCount == 0)
return;
auto& orderByQueue = getQueue();
// if the row count is less than the limit
if (fOrderByQueue.size() < fStart + fCount)
if (orderByQueue.size() < fStart + fCount)
{
copyRow(row, &fRow0);
OrderByRow newRow(fRow0, fRule);
fOrderByQueue.push(newRow);
orderByQueue.push(newRow);
uint64_t memSizeInc = sizeof(newRow);
fUncommitedMemory += memSizeInc;
@ -155,20 +156,20 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
}
}
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
row1.setData(swapRow.fData);
copyRow(row, &row1);
if (fDistinct)
{
fDistinctMap->erase(fOrderByQueue.top().fData);
fDistinctMap->erase(orderByQueue.top().fData);
fDistinctMap->insert(row1.getPointer());
}
fOrderByQueue.pop();
fOrderByQueue.push(swapRow);
orderByQueue.pop();
orderByQueue.push(swapRow);
}
}
@ -194,7 +195,9 @@ void LimitedOrderBy::finalize()
if (fRowGroup.getRowCount() > 0)
fDataQueue.push(fData);
if (fOrderByQueue.size() > 0)
auto& orderByQueue = getQueue();
if (orderByQueue.size() > 0)
{
// *DRRTUY Very memory intensive. CS needs to account active
// memory only and release memory if needed.
@ -210,7 +213,7 @@ void LimitedOrderBy::finalize()
uint64_t offset = 0;
uint64_t i = 0;
// Reduce queue size by an offset value if it applicable.
uint64_t queueSizeWoOffset = fOrderByQueue.size() > fStart ? fOrderByQueue.size() - fStart : 0;
uint64_t queueSizeWoOffset = orderByQueue.size() > fStart ? orderByQueue.size() - fStart : 0;
list<RGData> tempRGDataList;
if (fCount <= queueSizeWoOffset)
@ -239,15 +242,15 @@ void LimitedOrderBy::finalize()
offset = offset != 0 ? offset - 1 : offset;
fRowGroup.getRow(offset, &fRow0);
while ((fOrderByQueue.size() > fStart) && (i++ < fCount))
while ((orderByQueue.size() > fStart) && (i++ < fCount))
{
const OrderByRow& topRow = fOrderByQueue.top();
const OrderByRow& topRow = orderByQueue.top();
row1.setData(topRow.fData);
copyRow(row1, &fRow0);
fRowGroup.incRowCount();
offset--;
fRow0.prevRow(rSize);
fOrderByQueue.pop();
orderByQueue.pop();
// if RG has fRowsPerRG rows
if (offset == (uint64_t)-1)

View File

@ -165,6 +165,8 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
{
// Initialize ResourceManager to acount memory usage.
fRm = jobInfo.rm;
// Initialize structures used by separate workers
uint64_t id = 1;
fRowGroupIn = rgIn;
@ -709,7 +711,9 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
fRowGroupOut.resetRowGroup(0);
// Calculate offset here
fRowGroupOut.getRow(0, &fRowOut);
ordering::SortingPQ finalPQ;
auto alloc = fRm->getAllocator<ordering::OrderByRow>();
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc);
scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
fRowGroupIn.initRow(&row1);
fRowGroupIn.initRow(&row2);
@ -903,7 +907,8 @@ void TupleAnnexStep::finalizeParallelOrderBy()
uint32_t rowSize = 0;
rowgroup::RGData rgDataOut;
ordering::SortingPQ finalPQ;
auto alloc = fRm->getAllocator<ordering::OrderByRow>();
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);

View File

@ -171,25 +171,7 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep
std::vector<uint64_t> fRunnersList;
uint16_t fFinishedThreads;
boost::mutex fParallelFinalizeMutex;
};
template <class T>
class reservablePQ : private std::priority_queue<T>
{
public:
typedef typename std::priority_queue<T>::size_type size_type;
explicit reservablePQ(size_type capacity = 0)
{
reserve(capacity);
};
void reserve(size_type capacity)
{
this->c.reserve(capacity);
}
size_type capacity() const
{
return this->c.capacity();
}
joblist::ResourceManager* fRm;
};
} // namespace joblist