/* Copyright (C) 2021 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #pragma once #include "resourcemanager.h" #include "rowgroup.h" #include "idbcompress.h" #include #include #include #include namespace rowgroup { uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets, uint32_t groupsPerThread, uint32_t inRowSize, uint32_t outRowSize, bool enabledDiskAggr); class MemManager; class RowPosHashStorage; using RowPosHashStoragePtr = std::unique_ptr; class RowGroupStorage; using RGDataUnPtr = std::unique_ptr; using PosOpos = std::pair; using FgidTgid = std::pair; uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol); constexpr const size_t MaxConstStrSize = 2048ULL; constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1; constexpr const uint64_t HashMaskElements = 64ULL; class RowAggStorage { public: RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, RowGroup* keysRowGroup, uint32_t keyCount, joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool enabledDiskAgg = false, bool allowGenerations = false, compress::CompressInterface* compressor = nullptr); RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, uint32_t keyCount, joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessLimit = {}, bool enabledDiskAgg = false, bool allowGenerations = false, compress::CompressInterface* compressor = nullptr) : RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, rm, std::move(sessLimit), enabledDiskAgg, allowGenerations, compressor) { } ~RowAggStorage(); static uint16_t getMaxRows(bool enabledDiskAgg) { return (enabledDiskAgg ? 8192 : 256); } static size_t getBucketSize(); /** @brief Find or create resulting row. * * Create "aggregation key" row if necessary. * NB! Using getTargetRow() after append() is UB! * * @param row(in) input row * @param rowOut() row to aggregate data from input row * * @returns true if new row created, false otherwise */ bool getTargetRow(const Row& row, Row& rowOut); bool getTargetRow(const Row& row, uint64_t row_hash, Row& rowOut); /** @brief Dump some RGDatas to disk and release memory for further use. */ void dump(); /** @brief Append RGData from other RowAggStorage and clear it. * * NB! Any operation except getNextRGData() or append() is UB! * * @param other(in) donor storage */ void append(RowAggStorage& other); /** @brief Remove last RGData from internal RGData storage and return it. * * @returns pointer to the next RGData or nullptr if empty */ std::unique_ptr getNextRGData(); /** @brief Remove last RGData from in-memory storage or disk. * Iterates over all generations on disk if available. * @returns True if RGData is returned in parameter or false if no more RGDatas can be returned. */ bool getNextOutputRGData(std::unique_ptr& rgdata); /** @brief TODO * * @param mergeFunc * @param rowOut */ void finalize(std::function mergeFunc, Row& rowOut); /** @brief Calculate maximum size of hash assuming 80% fullness. * * @param elems(in) number of elements * @returns calculated size */ inline static size_t calcMaxSize(size_t elems) noexcept { if (LIKELY(elems <= std::numeric_limits::max() / 100)) return elems * 80 / 100; return (elems / 100) * 80; } inline static size_t calcSizeWithBuffer(size_t elems, size_t maxSize) noexcept { return elems + std::min(maxSize, 0xFFUL); } inline static size_t calcSizeWithBuffer(size_t elems) noexcept { return calcSizeWithBuffer(elems, calcMaxSize(elems)); } private: struct Data; /** @brief Create new RowAggStorage with the same params and load dumped data * * @param gen(in) generation number * @return pointer to a new RowAggStorage */ RowAggStorage* clone(uint16_t gen) const; /** @brief Free any internal data */ void freeData(); /** @brief Move internal data & row position inside [insIdx, startIdx] up by 1. * * @param startIdx(in) last element's index to move * @param insIdx(in) first element's index to move */ void shiftUp(size_t startIdx, size_t insIdx); using InfoIdxType = std::pair; inline InfoIdxType rowHashToIdx(uint64_t h, const size_t mask, const uint64_t hashMultiplier, const uint32_t infoInc, const uint32_t infoHashShift) const { // An addition from the original robin hood HM. h *= hashMultiplier; h ^= h >> 33U; uint32_t info = infoInc + static_cast((h & INFO_MASK) >> infoHashShift); size_t idx = (h >> INIT_INFO_BITS) & mask; return {info, idx}; } inline InfoIdxType rowHashToIdx(uint64_t h) const { return rowHashToIdx(h, fCurData->fMask, fCurData->hashMultiplier_, fCurData->fInfoInc, fCurData->fInfoHashShift); } /** @brief Iterate over internal info until info with less-or-equal distance * from the best position was found. * * @param info(in,out) info data * @param idx(in,out) index */ inline void nextWhileLess(uint32_t& info, size_t& idx, const Data* curData) const noexcept { while (info < curData->fInfo[idx]) { next(info, idx, curData); } } inline void nextWhileLess(uint32_t& info, size_t& idx) const noexcept { return nextWhileLess(info, idx, fCurData); } /** @brief Get next index and corresponding info */ inline void next(uint32_t& info, size_t& idx, const Data* curData) const noexcept { ++(idx); info += curData->fInfoInc; } inline void next(uint32_t& info, size_t& idx) const noexcept { return next(info, idx, fCurData); } /** @brief Get index and info of the next non-empty entry */ inline void nextExisting(uint32_t& info, size_t& idx) const noexcept { uint64_t n = 0; uint64_t data; while (true) { memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data)); if (data == 0) { idx += sizeof(n); } else { break; } } #if BYTE_ORDER == BIG_ENDIAN n = __builtin_clzll(data) / sizeof(data); #else n = __builtin_ctzll(data) / sizeof(data); #endif idx += n; info = fCurData->fInfo[idx]; } /** @brief Increase internal data size if needed */ void increaseSize(); /** @brief Increase distance capacity of info removing 1 bit of the hash. * * @returns success */ bool tryIncreaseInfo(); /** @brief Reserve space for number of elements (power of two) * * This function performs re-insert all data * * @param elems(in) new size */ void rehashPowerOfTwo(size_t elems); /** @brief Move elements from old one into rehashed data. * * It's mostly the same algo as in getTargetRow(), but returns nothing * and skips some checks because it's guaranteed that there is no dups. * * @param oldIdx(in) index of "old" data * @param oldHashes(in) old storage of row positions and hashes */ void insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes); /** @brief (Re)Initialize internal data of specified size. * * @param elems(in) number of elements */ void initData(size_t elems, const RowPosHashStorage* oldHashes); /** @brief Calculate memory size of info data * * @param elems(in) number of elements * @returns size in bytes */ inline static size_t calcBytes(size_t elems) noexcept { return elems + sizeof(uint64_t); } /** @brief Reserve place sufficient for elems * * @param elems(in) number of elements */ void reserve(size_t elems); /** @brief Start new aggregation generation * * Dump all the data on disk, including internal info data, positions & row * hashes, and the rowgroups itself. */ void startNewGeneration(); /** @brief Save internal info data on disk */ void dumpInternalData() const; /** @brief Load previously dumped data from disk * * @param gen(in) generation number */ void loadGeneration(uint16_t gen); /** @brief Load previously dumped data into the tmp storage */ void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, size_t& hashMultiplier, uint32_t& infoInc, uint32_t& infoHashShift, std::unique_ptr& info); /** @brief Remove temporary data files */ void cleanup(); void cleanup(uint16_t gen); /** @brief Remove all temporary data files */ void cleanupAll() noexcept; std::string makeDumpFilename(int32_t gen = -1) const; private: static constexpr size_t INIT_SIZE{sizeof(uint64_t)}; static constexpr uint32_t INIT_INFO_BITS{5}; static constexpr uint8_t INIT_INFO_INC{1U << INIT_INFO_BITS}; static constexpr size_t INFO_MASK{INIT_INFO_INC - 1U}; static constexpr uint8_t INIT_INFO_HASH_SHIFT{0}; static constexpr uint16_t MAX_INMEMORY_GENS{4}; // This is SplitMix64 implementation borrowed from here // https://thompsonsed.co.uk/random-number-generators-for-c-performance-tested inline uint64_t nextRandom() { uint64_t z = (fRandom += UINT64_C(0x9E3779B97F4A7C15)); z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9); z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB); return z ^ (z >> 31); } inline uint64_t nextRandDistib() { return nextRandom() % 100; } struct Data { RowPosHashStoragePtr fHashes; std::unique_ptr fInfo; // This is a power of 2 that controls a potential number of hash buckets // w/o rehashing size_t fSize{0}; size_t fMask{0}; size_t fMaxSize{0}; uint64_t hashMultiplier_{0xc4ceb9fe1a85ec53ULL}; uint32_t fInfoInc{INIT_INFO_INC}; uint32_t fInfoHashShift{INIT_INFO_HASH_SHIFT}; }; std::vector> fGens; Data* fCurData; uint32_t fMaxRows; const bool fExtKeys; std::unique_ptr fStorage; std::unique_ptr fRealKeysStorage; RowGroupStorage* fKeysStorage; uint32_t fLastKeyCol; uint16_t fGeneration{0}; void* fUniqId; Row fKeyRow; std::unique_ptr fMM; uint32_t fNumOfInputRGPerThread; bool fAggregated = true; bool fAllowGenerations; bool fEnabledDiskAggregation; std::unique_ptr fCompressor; std::string fTmpDir; bool fInitialized{false}; rowgroup::RowGroup* fRowGroupOut; rowgroup::RowGroup* fKeysRowGroup; uint64_t fRandom = 0xc4ceb9fe1a85ec53ULL; // initial integer to set PRNG up }; } // namespace rowgroup