1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization (#3525)

* feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization

    Iterate over the rows in the plain vector of RGData instead of
    iterating over the hashmap. This reduces the complexity and speeds
    up finalization (by up to the twice in the certain cases)

* replace magic constant with muggle constant
This commit is contained in:
Alexey Antipovsky
2025-05-21 11:53:48 +02:00
committed by GitHub
parent 0dceaa318d
commit bd0f59910a
2 changed files with 104 additions and 100 deletions

View File

@ -19,12 +19,15 @@
#include <sys/stat.h>
#include <boost/filesystem.hpp>
#include <cstdint>
#include "branchpred.h"
#include "rowgroup.h"
#include <resourcemanager.h>
#include <fcntl.h>
#include "rowstorage.h"
#include "robin_hood.h"
//#define DISK_AGG_DEBUG
namespace
{
int writeData(int fd, const char* buf, size_t sz)
@ -284,8 +287,10 @@ class MemManager
release(fMemUsed);
}
bool acquire(std::size_t amount)
bool acquire(ssize_t amount)
{
if (UNLIKELY(-amount > fMemUsed))
amount = -fMemUsed;
return acquireImpl(amount);
}
void release(ssize_t amount = 0)
@ -332,16 +337,16 @@ class MemManager
}
protected:
virtual bool acquireImpl(std::size_t amount)
virtual bool acquireImpl(ssize_t amount)
{
fMemUsed += amount;
return true;
}
virtual void releaseImpl(std::size_t amount)
virtual void releaseImpl(ssize_t amount)
{
fMemUsed -= amount;
}
ssize_t fMemUsed = 0;
ssize_t fMemUsed{0};
};
class RMMemManager : public MemManager
@ -389,7 +394,7 @@ class RMMemManager : public MemManager
}
protected:
bool acquireImpl(size_t amount) final
bool acquireImpl(ssize_t amount) final
{
if (amount)
{
@ -402,7 +407,7 @@ class RMMemManager : public MemManager
return true;
}
void releaseImpl(size_t amount) override
void releaseImpl(ssize_t amount) override
{
if (amount)
{
@ -858,8 +863,7 @@ class RowGroupStorage
fRGDatas.pop_back();
fRowGroupOut->setData(rgdata.get());
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
fMM->release(memSz);
fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows));
fLRU->remove(rgid);
if (fRowGroupOut->getRowCount() == 0)
continue;
@ -890,6 +894,50 @@ class RowGroupStorage
fLRU->add(rgid);
}
/** @brief Get the row at the specified position, skipping nonexistent rows
*
* @param idx(in, out) index (from 0) of the row, set to the actual index
* @param row(out) resulting row
*/
bool getRowForFinalization(uint64_t& idx, Row& row)
{
auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows);
while (rgid < fRGDatas.size())
{
if (UNLIKELY(!fRGDatas[rgid]))
{
loadRG(rgid);
}
fRowGroupOut->setData(fRGDatas[rgid].get());
if (UNLIKELY(rid >= fRowGroupOut->getRowCount()))
{
++rgid;
rid = 0;
continue;
}
fRGDatas[rgid]->getRow(rid, &row);
idx = rowGidRidToIdx(rgid, rid, fMaxRows);
return true;
}
return false;
}
size_t getRowsPerRG() const
{
return fMaxRows;
}
void dropRGData(uint64_t rgid)
{
if (UNLIKELY(!fRGDatas[rgid]))
{
return;
}
fRowGroupOut->setData(fRGDatas[rgid].get());
fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows));
fRGDatas[rgid].reset();
}
/** @brief Return a row and an index at the first free position.
*
* @param idx(out) index of the row
@ -1797,9 +1845,9 @@ void RowAggStorage::dump()
if (!fEnabledDiskAggregation)
return;
constexpr const int freeMemLimit = 50ULL * 1024ULL * 1024ULL;
constexpr int64_t freeMemLimit = 50LL * 1024LL * 1024LL;
const int64_t leaveFree = fNumOfInputRGPerThread * fRowGroupOut->getRowSize() * getBucketSize();
const int64_t leaveFree = std::max(RGDataSizeType(freeMemLimit), fNumOfInputRGPerThread * fRowGroupOut->getSizeWithStrings(fMaxRows) * getBucketSize());
uint64_t freeAttempts{0};
int64_t freeMem = 0;
while (true)
@ -2241,21 +2289,11 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fKeysStorage = fStorage.get();
continue;
}
std::unique_ptr<RowPosHashStorage> prevHashes;
size_t prevSize;
size_t prevMask;
size_t prevMaxSize;
size_t prevHashMultiplier;
uint32_t prevInfoInc;
uint32_t prevInfoHashShift;
std::unique_ptr<uint8_t[]> prevInfo;
std::unique_ptr<RowGroupStorage> prevRowStorage;
std::unique_ptr<RowGroupStorage> prevRealKeyRowStorage;
RowGroupStorage* prevKeyRowStorage{nullptr};
auto elems = calcSizeWithBuffer(fCurData->fMask + 1);
for (uint16_t prevGen = 0; prevGen < curGen; ++prevGen)
{
prevRowStorage.reset(fStorage->clone(prevGen));
@ -2267,94 +2305,63 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
else
prevKeyRowStorage = prevRowStorage.get();
loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevHashMultiplier, prevInfoInc,
prevInfoHashShift, prevInfo);
prevHashes = fCurData->fHashes->clone(prevMask + 1, prevGen, true);
// iterate over current generation rows
uint64_t idx{};
uint32_t info{};
for (;; next(info, idx))
if (fExtKeys)
{
nextExisting(info, idx);
fKeysRowGroup->initRow(&tmpKeyRow);
}
auto& prevRow = fExtKeys ? tmpKeyRow : tmpRow;
if (idx >= elems)
// iterate over rows in prev generation
for (uint64_t rowidx = 0; prevKeyRowStorage->getRowForFinalization(rowidx, prevRow); ++rowidx)
{
auto [rgid, rid] = rowIdxToGidRid(rowidx, prevKeyRowStorage->getRowsPerRG());
if (rgid != 0 && rid == 0)
{
// done finalizing generation
break;
// start next RGData. At this point we don't need the previous one and can free the data
// as it has no changes to dump
prevKeyRowStorage->dropRGData(rgid - 1);
if (fExtKeys) {
prevKeyRowStorage->dropRGData(rgid - 1);
}
}
const auto& pos = fCurData->fHashes->get(idx);
if (fKeysStorage->isFinalized(pos.idx))
if (prevKeyRowStorage->isFinalized(rowidx))
{
// this row was already merged into newer generation, skip it
continue;
}
// now try to find row in the previous generation
auto [pinfo, pidx] =
rowHashToIdx(pos.hash, prevMask, prevHashMultiplier, prevInfoInc, prevInfoHashShift);
while (pinfo < prevInfo[pidx])
// TODO: store hashes in the RowGroupStorage?
uint64_t hash = hashRow(prevRow, fLastKeyCol);
auto [info, idx] = rowHashToIdx(hash);
nextWhileLess(info, idx);
constexpr uint64_t NOTFOUND_IDX = -1;
uint64_t curidx = NOTFOUND_IDX;
while (info == fCurData->fInfo[idx])
{
++pidx;
pinfo += prevInfoInc;
}
if (prevInfo[pidx] != pinfo)
{
// there is no such row
continue;
}
auto ppos = prevHashes->get(pidx);
while (prevInfo[pidx] == pinfo && pos.hash != ppos.hash)
{
// hashes are not equal => collision, try all matched rows
++pidx;
pinfo += prevInfoInc;
ppos = prevHashes->get(pidx);
}
if (prevInfo[pidx] != pinfo || pos.hash != ppos.hash)
{
// no matches
continue;
}
bool found = false;
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
fKeysStorage->getRow(pos.idx, keyRow);
while (!found && prevInfo[pidx] == pinfo)
{
// try to find exactly match in case of hash collision
if (UNLIKELY(pos.hash != ppos.hash))
auto& pos = fCurData->fHashes->get(idx);
if (pos.hash == hash)
{
// hashes are not equal => no such row
++pidx;
pinfo += prevInfoInc;
ppos = prevHashes->get(pidx);
continue;
auto& keyRow = fExtKeys ? fKeyRow : rowOut;
fKeysStorage->getRow(pos.idx, keyRow);
if (prevRow.equals(keyRow, fLastKeyCol))
{
// Hallelujah!
curidx = pos.idx;
break;
}
}
prevKeyRowStorage->getRow(ppos.idx, fExtKeys ? tmpKeyRow : tmpRow);
if (!keyRow.equals(fExtKeys ? tmpKeyRow : tmpRow, fLastKeyCol))
{
++pidx;
pinfo += prevInfoInc;
ppos = prevHashes->get(pidx);
continue;
}
found = true;
next(info, idx);
}
if (!found)
if (curidx == NOTFOUND_IDX)
{
// nothing was found, go to the next row
continue;
}
if (UNLIKELY(prevKeyRowStorage->isFinalized(ppos.idx)))
if (UNLIKELY(fKeysStorage->isFinalized(curidx)))
{
// just to be sure, it can NEVER happen
continue;
}
@ -2365,15 +2372,15 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
// 4 Mark the prev generation row as finalized
if (fExtKeys)
{
prevRowStorage->getRow(ppos.idx, tmpRow);
fStorage->getRow(pos.idx, rowOut);
prevRowStorage->getRow(rowidx, tmpRow);
fStorage->getRow(curidx, rowOut);
}
mergeFunc(tmpRow);
genUpdated = true;
prevKeyRowStorage->markFinalized(ppos.idx);
prevKeyRowStorage->markFinalized(rowidx);
if (fExtKeys)
{
prevRowStorage->markFinalized(ppos.idx);
prevRowStorage->markFinalized(rowidx);
}
}
@ -2396,14 +2403,11 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
fKeysStorage->dumpAll(false);
}
// swap current generation N with the prev generation N-1
fCurData->fSize = prevSize;
fCurData->fMask = prevMask;
fCurData->fMaxSize = prevMaxSize;
fCurData->fInfoInc = prevInfoInc;
fCurData->fInfoHashShift = prevInfoHashShift;
fCurData->fInfo = std::move(prevInfo);
fCurData->fHashes = std::move(prevHashes);
// load previous generation (reusing RowGroupStorages)
loadGeneration(curGen - 1);
auto oh = std::move(fCurData->fHashes);
fCurData->fHashes = oh->clone(0, curGen - 1, true);
fStorage = std::move(prevRowStorage);
if (fExtKeys)
{