1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/utils/rowgroup/rowstorage.h
Alexey Antipovsky bd0f59910a feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization (#3525)
* feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization

    Iterate over the rows in the plain vector of RGData instead of
    iterating over the hashmap. This reduces the complexity and speeds
    up finalization (by up to the twice in the certain cases)

* replace magic constant with muggle constant
2025-05-21 10:53:48 +01:00

383 lines
11 KiB
C++

/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "resourcemanager.h"
#include "rowgroup.h"
#include "idbcompress.h"
#include <cstdint>
#include <random>
#include <sys/stat.h>
#include <unistd.h>
namespace rowgroup
{
uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets,
uint32_t groupsPerThread, uint32_t inRowSize, uint32_t outRowSize,
bool enabledDiskAggr);
class MemManager;
class RowPosHashStorage;
using RowPosHashStoragePtr = std::unique_ptr<RowPosHashStorage>;
class RowGroupStorage;
using RGDataUnPtr = std::unique_ptr<RGData>;
using PosOpos = std::pair<uint64_t, uint64_t>;
using FgidTgid = std::pair<uint64_t, uint64_t>;
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol);
constexpr const size_t MaxConstStrSize = 2048ULL;
constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1;
constexpr const uint64_t HashMaskElements = 64ULL;
class RowAggStorage
{
public:
RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, RowGroup* keysRowGroup, uint32_t keyCount,
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr);
RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, uint32_t keyCount,
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr)
: RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, rm, std::move(sessLimit), enabledDiskAgg,
allowGenerations, compressor)
{
}
~RowAggStorage();
static uint16_t getMaxRows(bool enabledDiskAgg)
{
return (enabledDiskAgg ? 8192 : 256);
}
static size_t getBucketSize();
/** @brief Find or create resulting row.
*
* Create "aggregation key" row if necessary.
* NB! Using getTargetRow() after append() is UB!
*
* @param row(in) input row
* @param rowOut() row to aggregate data from input row
*
* @returns true if new row created, false otherwise
*/
bool getTargetRow(const Row& row, Row& rowOut);
bool getTargetRow(const Row& row, uint64_t row_hash, Row& rowOut);
/** @brief Dump some RGDatas to disk and release memory for further use.
*/
void dump();
/** @brief Append RGData from other RowAggStorage and clear it.
*
* NB! Any operation except getNextRGData() or append() is UB!
*
* @param other(in) donor storage
*/
void append(RowAggStorage& other);
/** @brief Remove last RGData from internal RGData storage and return it.
*
* @returns pointer to the next RGData or nullptr if empty
*/
std::unique_ptr<RGData> getNextRGData();
/** @brief Remove last RGData from in-memory storage or disk.
* Iterates over all generations on disk if available.
* @returns True if RGData is returned in parameter or false if no more RGDatas can be returned.
*/
bool getNextOutputRGData(std::unique_ptr<RGData>& rgdata);
/** @brief Merge generations together
*
* @param mergeFunc
* @param rowOut
*/
void finalize(std::function<void(Row&)> mergeFunc, Row& rowOut);
/** @brief Calculate maximum size of hash assuming 80% fullness.
*
* @param elems(in) number of elements
* @returns calculated size
*/
inline static size_t calcMaxSize(size_t elems) noexcept
{
if (LIKELY(elems <= std::numeric_limits<size_t>::max() / 100))
return elems * 80 / 100;
return (elems / 100) * 80;
}
inline static size_t calcSizeWithBuffer(size_t elems, size_t maxSize) noexcept
{
return elems + std::min(maxSize, 0xFFUL);
}
inline static size_t calcSizeWithBuffer(size_t elems) noexcept
{
return calcSizeWithBuffer(elems, calcMaxSize(elems));
}
private:
struct Data;
/** @brief Create new RowAggStorage with the same params and load dumped data
*
* @param gen(in) generation number
* @return pointer to a new RowAggStorage
*/
RowAggStorage* clone(uint16_t gen) const;
/** @brief Free any internal data
*/
void freeData();
/** @brief Move internal data & row position inside [insIdx, startIdx] up by 1.
*
* @param startIdx(in) last element's index to move
* @param insIdx(in) first element's index to move
*/
void shiftUp(size_t startIdx, size_t insIdx);
using InfoIdxType = std::pair<uint32_t, size_t>;
inline InfoIdxType rowHashToIdx(uint64_t h, const size_t mask, const uint64_t hashMultiplier,
const uint32_t infoInc, const uint32_t infoHashShift) const
{
// An addition from the original robin hood HM.
h *= hashMultiplier;
h ^= h >> 33U;
uint32_t info = infoInc + static_cast<uint32_t>((h & INFO_MASK) >> infoHashShift);
size_t idx = (h >> INIT_INFO_BITS) & mask;
return {info, idx};
}
inline InfoIdxType rowHashToIdx(uint64_t h) const
{
return rowHashToIdx(h, fCurData->fMask, fCurData->hashMultiplier_, fCurData->fInfoInc,
fCurData->fInfoHashShift);
}
/** @brief Iterate over internal info until info with less-or-equal distance
* from the best position was found.
*
* @param info(in,out) info data
* @param idx(in,out) index
*/
inline void nextWhileLess(uint32_t& info, size_t& idx, const Data* curData) const noexcept
{
while (info < curData->fInfo[idx])
{
next(info, idx, curData);
}
}
inline void nextWhileLess(uint32_t& info, size_t& idx) const noexcept
{
return nextWhileLess(info, idx, fCurData);
}
/** @brief Get next index and corresponding info
*/
inline void next(uint32_t& info, size_t& idx, const Data* curData) const noexcept
{
++(idx);
info += curData->fInfoInc;
}
inline void next(uint32_t& info, size_t& idx) const noexcept
{
return next(info, idx, fCurData);
}
/** @brief Get index and info of the next non-empty entry
*/
inline void nextExisting(uint32_t& info, size_t& idx) const noexcept
{
uint64_t n = 0;
uint64_t data;
while (true)
{
memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data));
if (data == 0)
{
idx += sizeof(n);
}
else
{
break;
}
}
#if BYTE_ORDER == BIG_ENDIAN
n = __builtin_clzll(data) / sizeof(data);
#else
n = __builtin_ctzll(data) / sizeof(data);
#endif
idx += n;
info = fCurData->fInfo[idx];
}
/** @brief Increase internal data size if needed
*/
void increaseSize();
/** @brief Increase distance capacity of info removing 1 bit of the hash.
*
* @returns success
*/
bool tryIncreaseInfo();
/** @brief Reserve space for number of elements (power of two)
*
* This function performs re-insert all data
*
* @param elems(in) new size
*/
void rehashPowerOfTwo(size_t elems);
/** @brief Move elements from old one into rehashed data.
*
* It's mostly the same algo as in getTargetRow(), but returns nothing
* and skips some checks because it's guaranteed that there is no dups.
*
* @param oldIdx(in) index of "old" data
* @param oldHashes(in) old storage of row positions and hashes
*/
void insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes);
/** @brief (Re)Initialize internal data of specified size.
*
* @param elems(in) number of elements
*/
void initData(size_t elems, const RowPosHashStorage* oldHashes);
/** @brief Calculate memory size of info data
*
* @param elems(in) number of elements
* @returns size in bytes
*/
inline static size_t calcBytes(size_t elems) noexcept
{
return elems + sizeof(uint64_t);
}
/** @brief Reserve place sufficient for elems
*
* @param elems(in) number of elements
*/
void reserve(size_t elems);
/** @brief Start new aggregation generation
*
* Dump all the data on disk, including internal info data, positions & row
* hashes, and the rowgroups itself.
*/
void startNewGeneration();
/** @brief Save internal info data on disk */
void dumpInternalData() const;
/** @brief Load previously dumped data from disk
*
* @param gen(in) generation number
*/
void loadGeneration(uint16_t gen);
/** @brief Load previously dumped data into the tmp storage */
void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, size_t& hashMultiplier,
uint32_t& infoInc, uint32_t& infoHashShift, std::unique_ptr<uint8_t[]>& info);
/** @brief Remove temporary data files */
void cleanup();
void cleanup(uint16_t gen);
/** @brief Remove all temporary data files */
void cleanupAll() noexcept;
std::string makeDumpFilename(int32_t gen = -1) const;
private:
static constexpr size_t INIT_SIZE{sizeof(uint64_t)};
static constexpr uint32_t INIT_INFO_BITS{5};
static constexpr uint8_t INIT_INFO_INC{1U << INIT_INFO_BITS};
static constexpr size_t INFO_MASK{INIT_INFO_INC - 1U};
static constexpr uint8_t INIT_INFO_HASH_SHIFT{0};
static constexpr uint16_t MAX_INMEMORY_GENS{4};
// This is SplitMix64 implementation borrowed from here
// https://thompsonsed.co.uk/random-number-generators-for-c-performance-tested
inline uint64_t nextRandom()
{
uint64_t z = (fRandom += UINT64_C(0x9E3779B97F4A7C15));
z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9);
z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB);
return z ^ (z >> 31);
}
inline uint64_t nextRandDistib()
{
return nextRandom() % 100;
}
struct Data
{
RowPosHashStoragePtr fHashes;
std::unique_ptr<uint8_t[]> fInfo;
// This is a power of 2 that controls a potential number of hash buckets
// w/o rehashing
size_t fSize{0};
size_t fMask{0};
size_t fMaxSize{0};
uint64_t hashMultiplier_{0xc4ceb9fe1a85ec53ULL};
uint32_t fInfoInc{INIT_INFO_INC};
uint32_t fInfoHashShift{INIT_INFO_HASH_SHIFT};
};
std::vector<std::unique_ptr<Data>> fGens;
Data* fCurData;
uint32_t fMaxRows;
const bool fExtKeys;
std::unique_ptr<RowGroupStorage> fStorage;
std::unique_ptr<RowGroupStorage> fRealKeysStorage;
RowGroupStorage* fKeysStorage;
uint32_t fLastKeyCol;
uint16_t fGeneration{0};
void* fUniqId;
Row fKeyRow;
std::unique_ptr<MemManager> fMM;
uint32_t fNumOfInputRGPerThread;
bool fAggregated = true;
bool fAllowGenerations;
bool fEnabledDiskAggregation;
std::unique_ptr<compress::CompressInterface> fCompressor;
std::string fTmpDir;
bool fInitialized{false};
rowgroup::RowGroup* fRowGroupOut;
rowgroup::RowGroup* fKeysRowGroup;
uint64_t fRandom = 0xc4ceb9fe1a85ec53ULL; // initial integer to set PRNG up
};
} // namespace rowgroup