From a1d20d82d53d9ed56c3bdec479b3357f8e1086ad Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 28 Mar 2023 17:10:41 +0100 Subject: [PATCH] MCOL-5451 This resolves external GROUP BY result inconsistency issues (#2791) Given that idx is a RH hashmap bucket number and info is intra-bucket idx the root cause is triggered by the difference of idx/hash pair calculation for a certain GROUP BY generation and for generation aggregations merging that takes place in RowAggStorage::finalize. This patch generalizes rowHashToIdx to leverage it in both cases mentioned above. --- utils/rowgroup/rowstorage.cpp | 55 +++++++++++++---------------------- utils/rowgroup/rowstorage.h | 43 ++++++++------------------- 2 files changed, 33 insertions(+), 65 deletions(-) diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 4690f6cf5..5d997a6dc 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -1556,8 +1556,8 @@ bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut) if (fExtKeys) { fRealKeysStorage.reset(new RowGroupStorage(fTmpDir, fKeysRowGroup, fMaxRows, fMM->getResourceManaged(), - fMM->getSessionLimit(), !fEnabledDiskAggregation, - !fEnabledDiskAggregation, fCompressor.get())); + fMM->getSessionLimit(), !fEnabledDiskAggregation, + !fEnabledDiskAggregation, fCompressor.get())); fKeysStorage = fRealKeysStorage.get(); } else @@ -1571,10 +1571,8 @@ bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut) { increaseSize(); } - size_t idx{}; - uint32_t info{}; + auto [info, idx] = rowHashToIdx(hash); - rowHashToIdx(hash, info, idx); nextWhileLess(info, idx); while (info == fCurData->fInfo[idx]) @@ -1604,9 +1602,8 @@ bool RowAggStorage::getTargetRow(const Row& row, uint64_t hash, Row& rowOut) do { auto* genData = fGens[gen].get(); - size_t gidx{}; - uint32_t ginfo{}; - rowHashToIdx(hash, ginfo, gidx, genData); + auto [ginfo, gidx] = rowHashToIdx(hash, genData->fMask, genData->hashMultiplier_, genData->fInfoInc, + genData->fInfoHashShift); nextWhileLess(ginfo, gidx, genData); while (ginfo == genData->fInfo[gidx]) @@ -1694,9 +1691,7 @@ void RowAggStorage::dump() { startNewGeneration(); } - else if (fAllowGenerations && - freeMem < totalMem / 10 * 3 && - fRandDistr(fRandGen) < 30) + else if (fAllowGenerations && freeMem < totalMem / 10 * 3 && fRandDistr(fRandGen) < 30) { startNewGeneration(); } @@ -1786,18 +1781,6 @@ void RowAggStorage::shiftUp(size_t startIdx, size_t insIdx) fCurData->fHashes->shiftUp(startIdx, insIdx); } -void RowAggStorage::rowToIdx(const Row& row, uint32_t& info, size_t& idx, uint64_t& hash, - const Data* curData) const -{ - hash = hashRow(row, fLastKeyCol); - return rowHashToIdx(hash, info, idx, curData); -} - -void RowAggStorage::rowToIdx(const Row& row, uint32_t& info, size_t& idx, uint64_t& hash) const -{ - return rowToIdx(row, info, idx, hash, fCurData); -} - void RowAggStorage::increaseSize() { if (fCurData->fMask == 0) @@ -1828,7 +1811,9 @@ void RowAggStorage::increaseSize() // we have to resize, even though there would still be plenty of space left! // Try to rehash instead. Delete freed memory so we don't steadyily increase mem in case // we have to rehash a few times - nextHashMultiplier(); + // adding an *even* number, so that the multiplier will always stay odd. This is necessary + // so that the hash stays a mixing function (and thus doesn't have any information loss). + fCurData->hashMultiplier_ += 0xc4ceb9fe1a85ec54; rehashPowerOfTwo(fCurData->fMask + 1); } else @@ -1898,10 +1883,8 @@ void RowAggStorage::insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes) logging::ERR_DISKAGG_OVERFLOW1); } - size_t idx{}; - uint32_t info{}; auto pos = oldHashes->get(oldIdx); - rowHashToIdx(pos.hash, info, idx); + auto [info, idx] = rowHashToIdx(pos.hash); while (info <= fCurData->fInfo[idx]) { @@ -2023,6 +2006,7 @@ void RowAggStorage::dumpInternalData() const bs << fCurData->fSize; bs << fCurData->fMask; bs << fCurData->fMaxSize; + bs << fCurData->hashMultiplier_; bs << fCurData->fInfoInc; bs << fCurData->fInfoHashShift; bs.append(fCurData->fInfo.get(), calcBytes(calcSizeWithBuffer(fCurData->fMask + 1, fCurData->fMaxSize))); @@ -2090,6 +2074,7 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) size_t prevSize; size_t prevMask; size_t prevMaxSize; + size_t prevHashMultiplier; uint32_t prevInfoInc; uint32_t prevInfoHashShift; std::unique_ptr prevInfo; @@ -2111,7 +2096,8 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) else prevKeyRowStorage = prevRowStorage.get(); - loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevInfoInc, prevInfoHashShift, prevInfo); + loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevHashMultiplier, prevInfoInc, + prevInfoHashShift, prevInfo); prevHashes = fCurData->fHashes->clone(prevMask + 1, prevGen, true); // iterate over current generation rows @@ -2128,7 +2114,6 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) } const auto& pos = fCurData->fHashes->get(idx); - if (fKeysStorage->isFinalized(pos.idx)) { // this row was already merged into newer generation, skip it @@ -2136,8 +2121,9 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) } // now try to find row in the previous generation - uint32_t pinfo = prevInfoInc + static_cast((pos.hash & INFO_MASK) >> prevInfoHashShift); - uint64_t pidx = (pos.hash >> INIT_INFO_BITS) & prevMask; + auto [pinfo, pidx] = + rowHashToIdx(pos.hash, prevMask, prevHashMultiplier, prevInfoInc, prevInfoHashShift); + while (pinfo < prevInfo[pidx]) { ++pidx; @@ -2276,12 +2262,12 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) void RowAggStorage::loadGeneration(uint16_t gen) { - loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->fInfoInc, - fCurData->fInfoHashShift, fCurData->fInfo); + loadGeneration(gen, fCurData->fSize, fCurData->fMask, fCurData->fMaxSize, fCurData->hashMultiplier_, + fCurData->fInfoInc, fCurData->fInfoHashShift, fCurData->fInfo); } void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, - uint32_t& infoInc, uint32_t& infoHashShift, + size_t& hashMultiplier, uint32_t& infoInc, uint32_t& infoHashShift, std::unique_ptr& info) { messageqcpp::ByteStream bs; @@ -2312,6 +2298,7 @@ void RowAggStorage::loadGeneration(uint16_t gen, size_t& size, size_t& mask, siz bs >> size; bs >> mask; bs >> maxSize; + bs >> hashMultiplier; bs >> infoInc; bs >> infoHashShift; size_t infoSz = calcBytes(calcSizeWithBuffer(mask + 1, maxSize)); diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 7277d7872..8b230cf1c 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -147,34 +147,22 @@ class RowAggStorage */ void shiftUp(size_t startIdx, size_t insIdx); - /** @brief Find best position of row and save it's hash. - * - * @param row(in) input row - * @param info(out) info data - * @param idx(out) index computed from row hash - * @param hash(out) row hash value - */ - void rowToIdx(const Row& row, uint32_t& info, size_t& idx, uint64_t& hash) const; - void rowToIdx(const Row& row, uint32_t& info, size_t& idx, uint64_t& hash, const Data* curData) const; - - /** @brief Find best position using precomputed hash - * - * @param h(in) row hash - * @param info(out) info data - * @param idx(out) index - */ - inline void rowHashToIdx(uint64_t h, uint32_t& info, size_t& idx, const Data* curData) const + using InfoIdxType = std::pair; + inline InfoIdxType rowHashToIdx(uint64_t h, const size_t mask, const uint64_t hashMultiplier, + const uint32_t infoInc, const uint32_t infoHashShift) const { // An addition from the original robin hood HM. - h *= fCurData->hashMultiplier_; + h *= hashMultiplier; h ^= h >> 33U; - info = curData->fInfoInc + static_cast((h & INFO_MASK) >> curData->fInfoHashShift); - idx = (h >> INIT_INFO_BITS) & curData->fMask; + uint32_t info = infoInc + static_cast((h & INFO_MASK) >> infoHashShift); + size_t idx = (h >> INIT_INFO_BITS) & mask; + return {info, idx}; } - inline void rowHashToIdx(uint64_t h, uint32_t& info, size_t& idx) const + inline InfoIdxType rowHashToIdx(uint64_t h) const { - return rowHashToIdx(h, info, idx, fCurData); + return rowHashToIdx(h, fCurData->fMask, fCurData->hashMultiplier_, fCurData->fInfoInc, + fCurData->fInfoHashShift); } /** @brief Iterate over internal info until info with less-or-equal distance @@ -237,13 +225,6 @@ class RowAggStorage info = fCurData->fInfo[idx]; } - void nextHashMultiplier() - { - // adding an *even* number, so that the multiplier will always stay odd. This is necessary - // so that the hash stays a mixing function (and thus doesn't have any information loss). - fCurData->hashMultiplier_ += 0xc4ceb9fe1a85ec54; - } - /** @brief Increase internal data size if needed */ void increaseSize(); @@ -310,8 +291,8 @@ class RowAggStorage */ void loadGeneration(uint16_t gen); /** @brief Load previously dumped data into the tmp storage */ - void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, uint32_t& infoInc, - uint32_t& infoHashShift, std::unique_ptr& info); + void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, size_t& hashMultiplier, + uint32_t& infoInc, uint32_t& infoHashShift, std::unique_ptr& info); /** @brief Remove temporary data files */ void cleanup();