1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Reapply "fix(aggregation, disk-based) MCOL-5691 distinct aggregate disk based (#3145)"

This reverts commit a5c12b98d7.
This commit is contained in:
Aleksei Antipovskii
2024-12-06 11:32:36 +01:00
committed by drrtuy
parent 5e5d328269
commit e0a01c6cf4
7 changed files with 417 additions and 144 deletions

View File

@ -57,7 +57,7 @@
#include "rowstorage.h"
//..comment out NDEBUG to enable assertions, uncomment NDEBUG to disable
//#define NDEBUG
// #define NDEBUG
#include "mcs_decimal.h"
using namespace std;
@ -315,7 +315,7 @@ void RowAggregation::updateStringMinMax(utils::NullString val1, utils::NullStrin
if (val1.isNull())
{
// as any comparison with NULL is false, it should not affect min/max ranges.
return ; // do nothing.
return; // do nothing.
}
CHARSET_INFO* cs = fRow.getCharset(col);
int tmp = cs->strnncoll(val1.str(), val1.length(), val2.str(), val2.length());
@ -810,8 +810,9 @@ void RowAggregation::aggregateRow(Row& row, const uint64_t* hash,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
{
uint32_t cnt = fRollupFlag ? fGroupByCols.size() : 1;
for (uint32_t z = 0; z < cnt; z++) {
// groupby column list is not empty, find the entry.
for (uint32_t z = 0; z < cnt; z++)
{
// groupby column list is not empty, find the entry.
if (!fGroupByCols.empty())
{
bool is_new_row;
@ -856,7 +857,8 @@ void RowAggregation::aggregateRow(Row& row, const uint64_t* hash,
updateEntry(row, rgContextColl);
// these quantities are unsigned and comparing z and cnt - 1 can be incorrect
// because cnt can be zero.
if ((z + 1 < cnt)) {
if ((z + 1 < cnt))
{
// if we are rolling up, we mark appropriate field as NULL and also increment
// value in the "mark" column, so that we can differentiate between data and
// various rollups.
@ -1169,8 +1171,8 @@ void RowAggregation::doMinMax(const Row& rowIn, int64_t colIn, int64_t colOut, i
{
if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
{
updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(), colOut,
funcType);
updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(),
colOut, funcType);
}
else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
{
@ -2120,10 +2122,9 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
long double mean = fRow.getLongDoubleField(colAux);
long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1);
volatile long double delta = valIn - mean;
mean += delta/count;
mean += delta / count;
scaledMomentum2 += delta * (valIn - mean);
fRow.setDoubleField(count, colOut);
fRow.setLongDoubleField(mean, colAux);
fRow.setLongDoubleField(scaledMomentum2, colAux + 1);
@ -2173,8 +2174,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int
cc = dynamic_cast<execplan::ConstantColumn*>(fFunctionCols[funcColsIdx]->fpConstCol.get());
}
if ((cc && cc->isNull()) ||
(!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
if ((cc && cc->isNull()) || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
{
if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
{
@ -2500,7 +2500,8 @@ void RowAggregation::loadEmptySet(messageqcpp::ByteStream& bs)
//------------------------------------------------------------------------------
RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit, bool withRollup)
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit,
bool withRollup)
: RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
, fHasAvg(false)
, fHasStatsFunc(false)
@ -3228,7 +3229,7 @@ void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT: fRow.setStringField(strOut, colOut); break;
case execplan::CalpontSystemCatalog::TEXT: fRow.setStringField(strOut, colOut); break;
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
@ -4220,13 +4221,26 @@ bool RowAggregationUM::nextRowGroup()
return more;
}
bool RowAggregationUM::nextOutputRowGroup()
{
bool more = fRowAggStorage->getNextOutputRGData(fCurRGData);
if (more)
{
fRowGroupOut->setData(fCurRGData.get());
}
return more;
}
//------------------------------------------------------------------------------
// Row Aggregation constructor used on UM
// For 2nd phase of two-phase case, from partial RG to final aggregated RG
//------------------------------------------------------------------------------
RowAggregationUMP2::RowAggregationUMP2(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit, bool withRollup)
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit,
bool withRollup)
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
{
}
@ -4450,7 +4464,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
{
if (LIKELY(cnt > 0))
{
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();;
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
int128_t sum = valOut + wideValue;
fRow.setInt128Field(sum, colOut);
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
@ -4509,7 +4523,8 @@ void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t c
{
volatile long double delta = mean - blockMean;
nextMean = (mean * count + blockMean * blockCount) / nextCount;
nextScaledMomentum2 = scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount);
nextScaledMomentum2 =
scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount);
}
fRow.setDoubleField(nextCount, colOut);
fRow.setLongDoubleField(nextMean, colAux);
@ -4682,19 +4697,29 @@ void RowAggregationDistinct::addRowGroup(const RowGroup* pRows,
//------------------------------------------------------------------------------
void RowAggregationDistinct::doDistinctAggregation()
{
while (dynamic_cast<RowAggregationUM*>(fAggregator.get())->nextRowGroup())
auto* umAggregator = dynamic_cast<RowAggregationUM*>(fAggregator.get());
if (umAggregator)
{
fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData());
Row rowIn;
fRowGroupIn.initRow(&rowIn);
fRowGroupIn.getRow(0, &rowIn);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow())
while (umAggregator->nextOutputRowGroup())
{
aggregateRow(rowIn);
fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData());
Row rowIn;
fRowGroupIn.initRow(&rowIn);
fRowGroupIn.getRow(0, &rowIn);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow())
{
aggregateRow(rowIn);
}
}
}
else
{
std::ostringstream errmsg;
errmsg << "RowAggregationDistinct: incorrect fAggregator class.";
cerr << errmsg.str() << endl;
}
}
void RowAggregationDistinct::doDistinctAggregation_rowVec(vector<std::pair<Row::Pointer, uint64_t>>& inRows)

View File

@ -681,6 +681,17 @@ class RowAggregationUM : public RowAggregation
*/
bool nextRowGroup();
/** @brief Returns aggregated rows in a RowGroup as long as there are still not returned result RowGroups.
*
* This function should be called repeatedly until false is returned (meaning end of data).
* Returns data from in-memory storage, as well as spilled data from disk. If disk-based aggregation is
* happening, finalAggregation() should be called before returning result RowGroups to finalize the used
* RowAggStorages, merge different spilled generations and obtain correct aggregation results.
*
* @returns True if there are more result RowGroups, else false if all results have been returned.
*/
bool nextOutputRowGroup();
/** @brief Add an aggregator for DISTINCT aggregation
*/
void distinctAggregator(const boost::shared_ptr<RowAggregation>& da)

View File

@ -18,6 +18,7 @@
#include <unistd.h>
#include <sys/stat.h>
#include <boost/filesystem.hpp>
#include <cstdint>
#include "rowgroup.h"
#include <resourcemanager.h>
#include <fcntl.h>
@ -79,6 +80,11 @@ std::string errorString(int errNo)
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
return {buf};
}
size_t findFirstSetBit(const uint64_t mask)
{
return __builtin_ffsll(mask);
}
} // anonymous namespace
namespace rowgroup
@ -552,7 +558,7 @@ class Dumper
class RowGroupStorage
{
public:
using RGDataStorage = std::vector<std::unique_ptr<RGData>>;
using RGDataStorage = std::vector<RGDataUnPtr>;
public:
/** @brief Default constructor
@ -613,6 +619,54 @@ class RowGroupStorage
return fRowGroupOut->getSizeWithStrings(fMaxRows);
}
// This shifts data within RGData such that it compacts the non finalized rows
PosOpos shiftRowsInRowGroup(RGDataUnPtr& rgdata, uint64_t fgid, uint64_t tgid)
{
uint64_t pos = 0;
uint64_t opos = 0;
fRowGroupOut->setData(rgdata.get());
for (auto i = fgid; i < tgid; ++i)
{
if ((i - fgid) * HashMaskElements >= fRowGroupOut->getRowCount())
break;
uint64_t mask = ~fFinalizedRows[i];
if ((i - fgid + 1) * HashMaskElements > fRowGroupOut->getRowCount())
{
mask &= (~0ULL) >> ((i - fgid + 1) * HashMaskElements - fRowGroupOut->getRowCount());
}
opos = (i - fgid) * HashMaskElements;
if (mask == ~0ULL)
{
if (LIKELY(pos != opos))
moveRows(rgdata.get(), pos, opos, HashMaskElements);
pos += HashMaskElements;
continue;
}
if (mask == 0)
continue;
while (mask != 0)
{
// find position until block full of not finalized rows.
size_t b = findFirstSetBit(mask);
size_t e = findFirstSetBit(~(mask >> b)) + b;
if (UNLIKELY(e >= HashMaskElements))
mask = 0;
else
mask >>= e;
if (LIKELY(pos != opos + b - 1))
moveRows(rgdata.get(), pos, opos + b - 1, e - b);
pos += e - b;
opos += e;
}
--opos;
}
return {pos, opos};
}
/** @brief Take away RGDatas from another RowGroupStorage
*
* If some of the RGDatas is not in the memory do not load them,
@ -626,7 +680,7 @@ class RowGroupStorage
}
void append(RowGroupStorage* o)
{
std::unique_ptr<RGData> rgd;
RGDataUnPtr rgd;
std::string ofname;
while (o->getNextRGData(rgd, ofname))
{
@ -667,11 +721,130 @@ class RowGroupStorage
}
}
/** @brief Get the last RGData from fRGDatas, remove it from the vector and return its id.
*
* @param rgdata The RGData to be retrieved
*/
uint64_t getLastRGData(RGDataUnPtr& rgdata)
{
assert(!fRGDatas.empty());
uint64_t rgid = fRGDatas.size() - 1;
rgdata = std::move(fRGDatas[rgid]);
fRGDatas.pop_back();
return rgid;
}
static FgidTgid calculateGids(const uint64_t rgid, const uint64_t fMaxRows)
{
// Calculate from first and last uint64_t entry in fFinalizedRows BitMap
// which contains information about rows in the RGData.
uint64_t fgid = rgid * fMaxRows / HashMaskElements;
uint64_t tgid = fgid + fMaxRows / HashMaskElements;
return {fgid, tgid};
}
/** @brief Used to output aggregation results from memory and disk in the current generation in the form of
* RGData. Returns next RGData, loads from disk if necessary. Skips finalized rows as they would contain
* duplicate results, compacts actual rows into start of RGData and adapts number of rows transmitted in
* RGData.
* @returns A pointer to the next RGData or an empty pointer if there are no more RGDatas in this
* generation.
*/
bool getNextOutputRGData(RGDataUnPtr& rgdata)
{
if (UNLIKELY(fRGDatas.empty()))
{
fMM->release();
return false;
}
while (!fRGDatas.empty())
{
auto rgid = getLastRGData(rgdata);
auto [fgid, tgid] = calculateGids(rgid, fMaxRows);
if (fFinalizedRows.size() <= fgid)
{
// There are no finalized rows in this RGData. We can just return it.
// Load from disk if necessary and unlink DumpFile.
if (!rgdata)
{
loadRG(rgid, rgdata, true);
}
return true;
}
if (tgid >= fFinalizedRows.size())
fFinalizedRows.resize(tgid + 1, 0ULL);
// Check if there are rows to process
bool hasReturnRows = false;
for (auto i = fgid; i < tgid; ++i)
{
if (fFinalizedRows[i] != ~0ULL)
{
// Not all rows are finalized, we have to return at least parts of this RGData
hasReturnRows = true;
break;
}
}
if (rgdata)
{
// RGData is currently in memory
if (!hasReturnRows)
{
// All rows are finalized, don't return this RGData
continue;
}
}
else
{
if (hasReturnRows)
{
// Load RGData from disk, unlink dump file and continue processing
loadRG(rgid, rgdata, true);
}
else
{
// All rows are finalized. Unlink dump file and continue search for return RGData
unlink(makeRGFilename(rgid).c_str());
continue;
}
}
auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid);
// Nothing got shifted at all -> all rows must be finalized. If all rows finalized remove
// RGData and file and don't give it out.
if (pos == 0)
{
fLRU->remove(rgid);
unlink(makeRGFilename(rgid).c_str());
continue;
}
// set RGData with number of not finalized rows which have been compacted at front of RGData
fRowGroupOut->setData(rgdata.get());
fRowGroupOut->setRowCount(pos);
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
// Release the memory used by the current rgdata from this MemoryManager.
fMM->release(memSz);
unlink(makeRGFilename(rgid).c_str());
// to periodically clean up freed memory so it can be used by other threads.
fLRU->remove(rgid);
return true;
}
return false;
}
/** @brief Returns next RGData, load it from disk if necessary.
*
* @returns pointer to the next RGData or empty pointer if there is nothing
*/
std::unique_ptr<RGData> getNextRGData()
RGDataUnPtr getNextRGData()
{
while (!fRGDatas.empty())
{
@ -1031,7 +1204,7 @@ class RowGroupStorage
* @param fname(out) Filename of the dump if it's not in the memory
* @returns true if there is available RGData
*/
bool getNextRGData(std::unique_ptr<RGData>& rgdata, std::string& fname)
bool getNextRGData(RGDataUnPtr& rgdata, std::string& fname)
{
if (UNLIKELY(fRGDatas.empty()))
{
@ -1040,12 +1213,9 @@ class RowGroupStorage
}
while (!fRGDatas.empty())
{
uint64_t rgid = fRGDatas.size() - 1;
rgdata = std::move(fRGDatas[rgid]);
fRGDatas.pop_back();
auto rgid = getLastRGData(rgdata);
auto [fgid, tgid] = calculateGids(rgid, fMaxRows);
uint64_t fgid = rgid * fMaxRows / 64;
uint64_t tgid = fgid + fMaxRows / 64;
if (fFinalizedRows.size() > fgid)
{
if (tgid >= fFinalizedRows.size())
@ -1069,45 +1239,7 @@ class RowGroupStorage
continue;
}
uint64_t pos = 0;
uint64_t opos = 0;
fRowGroupOut->setData(rgdata.get());
for (auto i = fgid; i < tgid; ++i)
{
if ((i - fgid) * 64 >= fRowGroupOut->getRowCount())
break;
uint64_t mask = ~fFinalizedRows[i];
if ((i - fgid + 1) * 64 > fRowGroupOut->getRowCount())
{
mask &= (~0ULL) >> ((i - fgid + 1) * 64 - fRowGroupOut->getRowCount());
}
opos = (i - fgid) * 64;
if (mask == ~0ULL)
{
if (LIKELY(pos != opos))
moveRows(rgdata.get(), pos, opos, 64);
pos += 64;
continue;
}
if (mask == 0)
continue;
while (mask != 0)
{
size_t b = __builtin_ffsll(mask);
size_t e = __builtin_ffsll(~(mask >> b)) + b;
if (UNLIKELY(e >= 64))
mask = 0;
else
mask >>= e;
if (LIKELY(pos != opos + b - 1))
moveRows(rgdata.get(), pos, opos + b - 1, e - b);
pos += e - b;
opos += e;
}
--opos;
}
auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid);
if (pos == 0)
{
@ -1120,6 +1252,7 @@ class RowGroupStorage
fRowGroupOut->setRowCount(pos);
}
// Release the memory used by the current rgdata.
if (rgdata)
{
fRowGroupOut->setData(rgdata.get());
@ -1131,6 +1264,7 @@ class RowGroupStorage
{
fname = makeRGFilename(rgid);
}
// to periodically clean up freed memory so it can be used by other threads.
fLRU->remove(rgid);
return true;
}
@ -1170,7 +1304,7 @@ class RowGroupStorage
loadRG(rgid, fRGDatas[rgid]);
}
void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false)
void loadRG(uint64_t rgid, RGDataUnPtr& rgdata, bool unlinkDump = false)
{
auto fname = makeRGFilename(rgid);
@ -1738,7 +1872,7 @@ void RowAggStorage::append(RowAggStorage& other)
}
}
std::unique_ptr<RGData> RowAggStorage::getNextRGData()
RGDataUnPtr RowAggStorage::getNextRGData()
{
if (!fStorage)
{
@ -1749,6 +1883,43 @@ std::unique_ptr<RGData> RowAggStorage::getNextRGData()
return fStorage->getNextRGData();
}
bool RowAggStorage::getNextOutputRGData(RGDataUnPtr& rgdata)
{
if (!fStorage)
{
return {};
}
cleanup();
freeData();
// fGeneration is an unsigned int, we need a signed int for a comparison >= 0
int32_t gen = fGeneration;
while (gen >= 0)
{
bool moreInGeneration = fStorage->getNextOutputRGData(rgdata);
if (moreInGeneration)
{
fRowGroupOut->setData(rgdata.get());
return true;
}
// all generations have been emptied
if (fGeneration == 0)
{
break;
}
// current generation has no more RGDatas to return
// load earlier generation and continue with returning its RGDatas
gen--;
fGeneration--;
fStorage.reset(fStorage->clone(fGeneration));
}
return false;
}
void RowAggStorage::freeData()
{
for (auto& data : fGens)

View File

@ -20,6 +20,7 @@
#include "resourcemanager.h"
#include "rowgroup.h"
#include "idbcompress.h"
#include <cstdint>
#include <random>
#include <sys/stat.h>
#include <unistd.h>
@ -35,10 +36,15 @@ class RowPosHashStorage;
using RowPosHashStoragePtr = std::unique_ptr<RowPosHashStorage>;
class RowGroupStorage;
using RGDataUnPtr = std::unique_ptr<RGData>;
using PosOpos = std::pair<uint64_t, uint64_t>;
using FgidTgid = std::pair<uint64_t, uint64_t>;
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol);
constexpr const size_t MaxConstStrSize = 2048ULL;
constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1;
constexpr const uint64_t HashMaskElements = 64ULL;
class RowAggStorage
{
@ -97,6 +103,12 @@ class RowAggStorage
*/
std::unique_ptr<RGData> getNextRGData();
/** @brief Remove last RGData from in-memory storage or disk.
* Iterates over all generations on disk if available.
* @returns True if RGData is returned in parameter or false if no more RGDatas can be returned.
*/
bool getNextOutputRGData(std::unique_ptr<RGData>& rgdata);
/** @brief TODO
*
* @param mergeFunc