You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-5451 This resolves external GROUP BY result inconsistency issues (#2791)
Given that idx is a RH hashmap bucket number and info is intra-bucket idx the root cause is triggered by the difference of idx/hash pair calculation for a certain GROUP BY generation and for generation aggregations merging that takes place in RowAggStorage::finalize. This patch generalizes rowHashToIdx to leverage it in both cases mentioned above.
This commit is contained in:
@ -1556,8 +1556,8 @@ bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut)
|
||||
if (fExtKeys)
|
||||
{
|
||||
fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, fKeysRowGroup, fMaxRows, fMM->getResourceManaged(),
|
||||
fMM->getSessionLimit(), !fEnabledDiskAggregation,
|
||||
!fEnabledDiskAggregation, fCompressor.get()));
|
||||
fMM->getSessionLimit(), !fEnabledDiskAggregation,
|
||||
!fEnabledDiskAggregation, fCompressor.get()));
|
||||
fKeysStorage = fRealKeysStorage.get();
|
||||
}
|
||||
else
|
||||
@ -1571,10 +1571,8 @@ bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut)
|
||||
{
|
||||
increaseSize();
|
||||
}
|
||||
size_t idx{};
|
||||
uint32_t info{};
|
||||
auto [info, idx] = rowHashToIdx(hash);
|
||||
|
||||
rowHashToIdx(hash, info, idx);
|
||||
nextWhileLess(info, idx);
|
||||
|
||||
while (info == fCurData->fInfo[idx])
|
||||
@ -1604,9 +1602,8 @@ bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut)
|
||||
do
|
||||
{
|
||||
auto* genData = fGens[gen].get();
|
||||
size_t gidx{};
|
||||
uint32_t ginfo{};
|
||||
rowHashToIdx(hash, ginfo, gidx, genData);
|
||||
auto [ginfo, gidx] = rowHashToIdx(hash, genData->fMask, genData->hashMultiplier_, genData->fInfoInc,
|
||||
genData->fInfoHashShift);
|
||||
nextWhileLess(ginfo, gidx, genData);
|
||||
|
||||
while (ginfo == genData->fInfo[gidx])
|
||||
@ -1694,9 +1691,7 @@ void RowAggStorage::dump()
|
||||
{
|
||||
startNewGeneration();
|
||||
}
|
||||
else if (fAllowGenerations &&
|
||||
freeMem < totalMem / 10 * 3 &&
|
||||
fRandDistr(fRandGen) < 30)
|
||||
else if (fAllowGenerations && freeMem < totalMem / 10 * 3 && fRandDistr(fRandGen) < 30)
|
||||
{
|
||||
startNewGeneration();
|
||||
}
|
||||
@ -1786,18 +1781,6 @@ void RowAggStorage::shiftUp(size_t startIdx, size_t insIdx)
|
||||
fCurData->fHashes->shiftUp(startIdx, insIdx);
|
||||
}
|
||||
|
||||
void RowAggStorage::rowToIdx(const Row& row, uint32_t& info, size_t& idx, uint64_t& hash,
|
||||
const Data* curData) const
|
||||
{
|
||||
hash = hashRow(row, fLastKeyCol);
|
||||
return rowHashToIdx(hash, info, idx, curData);
|
||||
}
|
||||
|
||||
void RowAggStorage::rowToIdx(const Row& row, uint32_t& info, size_t& idx, uint64_t& hash) const
|
||||
{
|
||||
return rowToIdx(row, info, idx, hash, fCurData);
|
||||
}
|
||||
|
||||
void RowAggStorage::increaseSize()
|
||||
{
|
||||
if (fCurData->fMask == 0)
|
||||
@ -1828,7 +1811,9 @@ void RowAggStorage::increaseSize()
|
||||
// we have to resize, even though there would still be plenty of space left!
|
||||
// Try to rehash instead. Delete freed memory so we don't steadyily increase mem in case
|
||||
// we have to rehash a few times
|
||||
nextHashMultiplier();
|
||||
// adding an *even* number, so that the multiplier will always stay odd. This is necessary
|
||||
// so that the hash stays a mixing function (and thus doesn't have any information loss).
|
||||
fCurData->hashMultiplier_ += 0xc4ceb9fe1a85ec54;
|
||||
rehashPowerOfTwo(fCurData->fMask + 1);
|
||||
}
|
||||
else
|
||||
@ -1898,10 +1883,8 @@ void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes)
|
||||
logging::ERR_DISKAGG_OVERFLOW1);
|
||||
}
|
||||
|
||||
size_t idx{};
|
||||
uint32_t info{};
|
||||
auto pos = oldHashes->get(oldIdx);
|
||||
rowHashToIdx(pos.hash, info, idx);
|
||||
auto [info, idx] = rowHashToIdx(pos.hash);
|
||||
|
||||
while (info <= fCurData->fInfo[idx])
|
||||
{
|
||||
@ -2023,6 +2006,7 @@ void RowAggStorage::dumpInternalData() const
|
||||
bs << fCurData->fSize;
|
||||
bs << fCurData->fMask;
|
||||
bs << fCurData->fMaxSize;
|
||||
bs << fCurData->hashMultiplier_;
|
||||
bs << fCurData->fInfoInc;
|
||||
bs << fCurData->fInfoHashShift;
|
||||
bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize)));
|
||||
@ -2090,6 +2074,7 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
|
||||
size_t prevSize;
|
||||
size_t prevMask;
|
||||
size_t prevMaxSize;
|
||||
size_t prevHashMultiplier;
|
||||
uint32_t prevInfoInc;
|
||||
uint32_t prevInfoHashShift;
|
||||
std::unique_ptr<uint8_t[]> prevInfo;
|
||||
@ -2111,7 +2096,8 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
|
||||
else
|
||||
prevKeyRowStorage = prevRowStorage.get();
|
||||
|
||||
loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevInfoInc, prevInfoHashShift, prevInfo);
|
||||
loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevHashMultiplier, prevInfoInc,
|
||||
prevInfoHashShift, prevInfo);
|
||||
prevHashes = fCurData->fHashes->clone(prevMask + 1, prevGen, true);
|
||||
|
||||
// iterate over current generation rows
|
||||
@ -2128,7 +2114,6 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
|
||||
}
|
||||
|
||||
const auto& pos = fCurData->fHashes->get(idx);
|
||||
|
||||
if (fKeysStorage->isFinalized(pos.idx))
|
||||
{
|
||||
// this row was already merged into newer generation, skip it
|
||||
@ -2136,8 +2121,9 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
|
||||
}
|
||||
|
||||
// now try to find row in the previous generation
|
||||
uint32_t pinfo = prevInfoInc + static_cast<uint32_t>((pos.hash & INFO_MASK) >> prevInfoHashShift);
|
||||
uint64_t pidx = (pos.hash >> INIT_INFO_BITS) & prevMask;
|
||||
auto [pinfo, pidx] =
|
||||
rowHashToIdx(pos.hash, prevMask, prevHashMultiplier, prevInfoInc, prevInfoHashShift);
|
||||
|
||||
while (pinfo < prevInfo[pidx])
|
||||
{
|
||||
++pidx;
|
||||
@ -2276,12 +2262,12 @@ void RowAggStorage::finalize(std::function<void(Row&)> mergeFunc, Row& rowOut)
|
||||
|
||||
void RowAggStorage::loadGeneration(uint16_t gen)
|
||||
{
|
||||
loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->fInfoInc,
|
||||
fCurData->fInfoHashShift, fCurData->fInfo);
|
||||
loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->hashMultiplier_,
|
||||
fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo);
|
||||
}
|
||||
|
||||
void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize,
|
||||
uint32_t& infoInc, uint32_t& infoHashShift,
|
||||
size_t& hashMultiplier, uint32_t& infoInc, uint32_t& infoHashShift,
|
||||
std::unique_ptr<uint8_t[]>& info)
|
||||
{
|
||||
messageqcpp::ByteStream bs;
|
||||
@ -2312,6 +2298,7 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, siz
|
||||
bs >> size;
|
||||
bs >> mask;
|
||||
bs >> maxSize;
|
||||
bs >> hashMultiplier;
|
||||
bs >> infoInc;
|
||||
bs >> infoHashShift;
|
||||
size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize));
|
||||
|
Reference in New Issue
Block a user