You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	Given that idx is a RH hashmap bucket number and info is intra-bucket idx
    the root cause is triggered by the difference of idx/hash pair
    calculation for a certain GROUP BY generation and for generation
    aggregations merging that takes place in RowAggStorage::finalize.
    This patch generalizes rowHashToIdx to leverage it in both cases
    mentioned above.
		
	
		
			
				
	
	
		
			358 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			358 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2021 MariaDB Corporation
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include "resourcemanager.h"
 | 
						|
#include "rowgroup.h"
 | 
						|
#include "idbcompress.h"
 | 
						|
#include <random>
 | 
						|
#include <sys/stat.h>
 | 
						|
#include <unistd.h>
 | 
						|
 | 
						|
namespace rowgroup
 | 
						|
{
 | 
						|
uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets,
 | 
						|
                             uint32_t groupsPerThread, uint32_t inRowSize, uint32_t outRowSize,
 | 
						|
                             bool enabledDiskAggr);
 | 
						|
 | 
						|
class MemManager;
 | 
						|
class RowPosHashStorage;
 | 
						|
using RowPosHashStoragePtr = std::unique_ptr<RowPosHashStorage>;
 | 
						|
class RowGroupStorage;
 | 
						|
 | 
						|
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol);
 | 
						|
 | 
						|
constexpr const size_t MaxConstStrSize = 2048ULL;
 | 
						|
constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1;
 | 
						|
 | 
						|
class RowAggStorage
 | 
						|
{
 | 
						|
 public:
 | 
						|
  RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, RowGroup* keysRowGroup, uint32_t keyCount,
 | 
						|
                joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
 | 
						|
                bool enabledDiskAgg = false, bool allowGenerations = false,
 | 
						|
                compress::CompressInterface* compressor = nullptr);
 | 
						|
 | 
						|
  RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, uint32_t keyCount,
 | 
						|
                joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
 | 
						|
                bool enabledDiskAgg = false, bool allowGenerations = false,
 | 
						|
                compress::CompressInterface* compressor = nullptr)
 | 
						|
   : RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, rm, std::move(sessLimit), enabledDiskAgg,
 | 
						|
                   allowGenerations, compressor)
 | 
						|
  {
 | 
						|
  }
 | 
						|
 | 
						|
  ~RowAggStorage();
 | 
						|
 | 
						|
  static uint16_t getMaxRows(bool enabledDiskAgg)
 | 
						|
  {
 | 
						|
    return (enabledDiskAgg ? 8192 : 256);
 | 
						|
  }
 | 
						|
 | 
						|
  static size_t getBucketSize();
 | 
						|
 | 
						|
  /** @brief Find or create resulting row.
 | 
						|
   *
 | 
						|
   *    Create "aggregation key" row if necessary.
 | 
						|
   *    NB! Using getTargetRow() after append() is UB!
 | 
						|
   *
 | 
						|
   *  @param row(in)  input row
 | 
						|
   *  @param rowOut() row to aggregate data from input row
 | 
						|
   *
 | 
						|
   *  @returns true if new row created, false otherwise
 | 
						|
   */
 | 
						|
  bool getTargetRow(const Row& row, Row& rowOut);
 | 
						|
  bool getTargetRow(const Row& row, uint64_t row_hash, Row& rowOut);
 | 
						|
 | 
						|
  /** @brief Dump some RGDatas to disk and release memory for further use.
 | 
						|
   */
 | 
						|
  void dump();
 | 
						|
 | 
						|
  /** @brief Append RGData from other RowAggStorage and clear it.
 | 
						|
   *
 | 
						|
   *    NB! Any operation except getNextRGData() or append() is UB!
 | 
						|
   *
 | 
						|
   * @param other(in) donor storage
 | 
						|
   */
 | 
						|
  void append(RowAggStorage& other);
 | 
						|
 | 
						|
  /** @brief Remove last RGData from internal RGData storage and return it.
 | 
						|
   *
 | 
						|
   * @returns pointer to the next RGData or nullptr if empty
 | 
						|
   */
 | 
						|
  std::unique_ptr<RGData> getNextRGData();
 | 
						|
 | 
						|
  /** @brief TODO
 | 
						|
   *
 | 
						|
   * @param mergeFunc
 | 
						|
   * @param rowOut
 | 
						|
   */
 | 
						|
  void finalize(std::function<void(Row&)> mergeFunc, Row& rowOut);
 | 
						|
 | 
						|
  /** @brief Calculate maximum size of hash assuming 80% fullness.
 | 
						|
   *
 | 
						|
   * @param elems(in) number of elements
 | 
						|
   * @returns calculated size
 | 
						|
   */
 | 
						|
  inline static size_t calcMaxSize(size_t elems) noexcept
 | 
						|
  {
 | 
						|
    if (LIKELY(elems <= std::numeric_limits<size_t>::max() / 100))
 | 
						|
      return elems * 80 / 100;
 | 
						|
 | 
						|
    return (elems / 100) * 80;
 | 
						|
  }
 | 
						|
 | 
						|
  inline static size_t calcSizeWithBuffer(size_t elems, size_t maxSize) noexcept
 | 
						|
  {
 | 
						|
    return elems + std::min(maxSize, 0xFFUL);
 | 
						|
  }
 | 
						|
 | 
						|
  inline static size_t calcSizeWithBuffer(size_t elems) noexcept
 | 
						|
  {
 | 
						|
    return calcSizeWithBuffer(elems, calcMaxSize(elems));
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  struct Data;
 | 
						|
  /** @brief Create new RowAggStorage with the same params and load dumped data
 | 
						|
   *
 | 
						|
   * @param gen(in) generation number
 | 
						|
   * @return pointer to a new RowAggStorage
 | 
						|
   */
 | 
						|
  RowAggStorage* clone(uint16_t gen) const;
 | 
						|
 | 
						|
  /** @brief Free any internal data
 | 
						|
   */
 | 
						|
  void freeData();
 | 
						|
 | 
						|
  /** @brief Move internal data & row position inside [insIdx, startIdx] up by 1.
 | 
						|
   *
 | 
						|
   * @param startIdx(in) last element's index to move
 | 
						|
   * @param insIdx(in)   first element's index to move
 | 
						|
   */
 | 
						|
  void shiftUp(size_t startIdx, size_t insIdx);
 | 
						|
 | 
						|
  using InfoIdxType = std::pair<uint32_t, size_t>;
 | 
						|
  inline InfoIdxType rowHashToIdx(uint64_t h, const size_t mask, const uint64_t hashMultiplier,
 | 
						|
                                  const uint32_t infoInc, const uint32_t infoHashShift) const
 | 
						|
  {
 | 
						|
    // An addition from the original robin hood HM.
 | 
						|
    h *= hashMultiplier;
 | 
						|
    h ^= h >> 33U;
 | 
						|
    uint32_t info = infoInc + static_cast<uint32_t>((h & INFO_MASK) >> infoHashShift);
 | 
						|
    size_t idx = (h >> INIT_INFO_BITS) & mask;
 | 
						|
    return {info, idx};
 | 
						|
  }
 | 
						|
 | 
						|
  inline InfoIdxType rowHashToIdx(uint64_t h) const
 | 
						|
  {
 | 
						|
    return rowHashToIdx(h, fCurData->fMask, fCurData->hashMultiplier_, fCurData->fInfoInc,
 | 
						|
                        fCurData->fInfoHashShift);
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief Iterate over internal info until info with less-or-equal distance
 | 
						|
   *         from the best position was found.
 | 
						|
   *
 | 
						|
   * @param info(in,out) info data
 | 
						|
   * @param idx(in,out)  index
 | 
						|
   */
 | 
						|
  inline void nextWhileLess(uint32_t& info, size_t& idx, const Data* curData) const noexcept
 | 
						|
  {
 | 
						|
    while (info < curData->fInfo[idx])
 | 
						|
    {
 | 
						|
      next(info, idx, curData);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  inline void nextWhileLess(uint32_t& info, size_t& idx) const noexcept
 | 
						|
  {
 | 
						|
    return nextWhileLess(info, idx, fCurData);
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief Get next index and corresponding info
 | 
						|
   */
 | 
						|
  inline void next(uint32_t& info, size_t& idx, const Data* curData) const noexcept
 | 
						|
  {
 | 
						|
    ++(idx);
 | 
						|
    info += curData->fInfoInc;
 | 
						|
  }
 | 
						|
 | 
						|
  inline void next(uint32_t& info, size_t& idx) const noexcept
 | 
						|
  {
 | 
						|
    return next(info, idx, fCurData);
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief Get index and info of the next non-empty entry
 | 
						|
   */
 | 
						|
  inline void nextExisting(uint32_t& info, size_t& idx) const noexcept
 | 
						|
  {
 | 
						|
    uint64_t n = 0;
 | 
						|
    uint64_t data;
 | 
						|
    while (true)
 | 
						|
    {
 | 
						|
      memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data));
 | 
						|
      if (data == 0)
 | 
						|
      {
 | 
						|
        idx += sizeof(n);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
#if BYTE_ORDER == BIG_ENDIAN
 | 
						|
    n = __builtin_clzll(data) / sizeof(data);
 | 
						|
#else
 | 
						|
    n = __builtin_ctzll(data) / sizeof(data);
 | 
						|
#endif
 | 
						|
    idx += n;
 | 
						|
    info = fCurData->fInfo[idx];
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief Increase internal data size if needed
 | 
						|
   */
 | 
						|
  void increaseSize();
 | 
						|
 | 
						|
  /** @brief Increase distance capacity of info removing 1 bit of the hash.
 | 
						|
   *
 | 
						|
   * @returns success
 | 
						|
   */
 | 
						|
  bool tryIncreaseInfo();
 | 
						|
 | 
						|
  /** @brief Reserve space for number of elements (power of two)
 | 
						|
   *
 | 
						|
   *    This function performs re-insert all data
 | 
						|
   *
 | 
						|
   * @param elems(in)   new size
 | 
						|
   */
 | 
						|
  void rehashPowerOfTwo(size_t elems);
 | 
						|
 | 
						|
  /** @brief Move elements from old one into rehashed data.
 | 
						|
   *
 | 
						|
   *    It's mostly the same algo as in getTargetRow(), but returns nothing
 | 
						|
   *    and skips some checks because it's guaranteed that there is no dups.
 | 
						|
   *
 | 
						|
   * @param oldIdx(in)    index of "old" data
 | 
						|
   * @param oldHashes(in) old storage of row positions and hashes
 | 
						|
   */
 | 
						|
  void insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes);
 | 
						|
 | 
						|
  /** @brief (Re)Initialize internal data of specified size.
 | 
						|
   *
 | 
						|
   * @param elems(in) number of elements
 | 
						|
   */
 | 
						|
  void initData(size_t elems, const RowPosHashStorage* oldHashes);
 | 
						|
 | 
						|
  /** @brief Calculate memory size of info data
 | 
						|
   *
 | 
						|
   * @param elems(in) number of elements
 | 
						|
   * @returns size in bytes
 | 
						|
   */
 | 
						|
  inline static size_t calcBytes(size_t elems) noexcept
 | 
						|
  {
 | 
						|
    return elems + sizeof(uint64_t);
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief Reserve place sufficient for elems
 | 
						|
   *
 | 
						|
   * @param elems(in) number of elements
 | 
						|
   */
 | 
						|
  void reserve(size_t elems);
 | 
						|
 | 
						|
  /** @brief Start new aggregation generation
 | 
						|
   *
 | 
						|
   * Dump all the data on disk, including internal info data, positions & row
 | 
						|
   * hashes, and the rowgroups itself.
 | 
						|
   */
 | 
						|
  void startNewGeneration();
 | 
						|
 | 
						|
  /** @brief Save internal info data on disk */
 | 
						|
  void dumpInternalData() const;
 | 
						|
 | 
						|
  /** @brief Load previously dumped data from disk
 | 
						|
   *
 | 
						|
   * @param gen(in) generation number
 | 
						|
   */
 | 
						|
  void loadGeneration(uint16_t gen);
 | 
						|
  /** @brief Load previously dumped data into the tmp storage */
 | 
						|
  void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, size_t& hashMultiplier,
 | 
						|
                      uint32_t& infoInc, uint32_t& infoHashShift, std::unique_ptr<uint8_t[]>& info);
 | 
						|
 | 
						|
  /** @brief Remove temporary data files */
 | 
						|
  void cleanup();
 | 
						|
  void cleanup(uint16_t gen);
 | 
						|
 | 
						|
  /** @brief Remove all temporary data files */
 | 
						|
  void cleanupAll() noexcept;
 | 
						|
 | 
						|
  std::string makeDumpFilename(int32_t gen = -1) const;
 | 
						|
 | 
						|
 private:
 | 
						|
  static constexpr size_t INIT_SIZE{sizeof(uint64_t)};
 | 
						|
  static constexpr uint32_t INIT_INFO_BITS{5};
 | 
						|
  static constexpr uint8_t INIT_INFO_INC{1U << INIT_INFO_BITS};
 | 
						|
  static constexpr size_t INFO_MASK{INIT_INFO_INC - 1U};
 | 
						|
  static constexpr uint8_t INIT_INFO_HASH_SHIFT{0};
 | 
						|
  static constexpr uint16_t MAX_INMEMORY_GENS{4};
 | 
						|
 | 
						|
  struct Data
 | 
						|
  {
 | 
						|
    RowPosHashStoragePtr fHashes;
 | 
						|
    std::unique_ptr<uint8_t[]> fInfo;
 | 
						|
    // This is a power of 2 that controls a potential number of hash buckets
 | 
						|
    // w/o rehashing
 | 
						|
    size_t fSize{0};
 | 
						|
    size_t fMask{0};
 | 
						|
    size_t fMaxSize{0};
 | 
						|
    uint64_t hashMultiplier_{0xc4ceb9fe1a85ec53ULL};
 | 
						|
    uint32_t fInfoInc{INIT_INFO_INC};
 | 
						|
    uint32_t fInfoHashShift{INIT_INFO_HASH_SHIFT};
 | 
						|
  };
 | 
						|
  std::vector<std::unique_ptr<Data>> fGens;
 | 
						|
  Data* fCurData;
 | 
						|
  uint32_t fMaxRows;
 | 
						|
  const bool fExtKeys;
 | 
						|
 | 
						|
  std::unique_ptr<RowGroupStorage> fStorage;
 | 
						|
  std::unique_ptr<RowGroupStorage> fRealKeysStorage;
 | 
						|
  RowGroupStorage* fKeysStorage;
 | 
						|
  uint32_t fLastKeyCol;
 | 
						|
 | 
						|
  uint16_t fGeneration{0};
 | 
						|
  void* fUniqId;
 | 
						|
 | 
						|
  Row fKeyRow;
 | 
						|
 | 
						|
  std::unique_ptr<MemManager> fMM;
 | 
						|
  uint32_t fNumOfInputRGPerThread;
 | 
						|
  bool fAggregated = true;
 | 
						|
  bool fAllowGenerations;
 | 
						|
  bool fEnabledDiskAggregation;
 | 
						|
  std::unique_ptr<compress::CompressInterface> fCompressor;
 | 
						|
  std::string fTmpDir;
 | 
						|
  bool fInitialized{false};
 | 
						|
  rowgroup::RowGroup* fRowGroupOut;
 | 
						|
  rowgroup::RowGroup* fKeysRowGroup;
 | 
						|
  std::random_device fRD;
 | 
						|
  std::mt19937 fRandGen;
 | 
						|
  std::uniform_int_distribution<uint8_t> fRandDistr;
 | 
						|
};
 | 
						|
 | 
						|
}  // namespace rowgroup
 |