You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Revert "fix(aggregation, disk-based) MCOL-5691 distinct aggregate disk based (#3145)"
This reverts commit c7caa4374f
.
This commit is contained in:
@ -291,7 +291,6 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup
|
|||||||
fNumOfBuckets =
|
fNumOfBuckets =
|
||||||
calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(),
|
calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(),
|
||||||
fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation());
|
fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation());
|
||||||
|
|
||||||
fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
|
fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
|
||||||
|
|
||||||
fMemUsage.reset(new uint64_t[fNumOfThreads]);
|
fMemUsage.reset(new uint64_t[fNumOfThreads]);
|
||||||
@ -395,7 +394,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
|
|||||||
rowGroupIn->initRow(&rowIn);
|
rowGroupIn->initRow(&rowIn);
|
||||||
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
|
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
|
||||||
|
|
||||||
while (subDistAgg->nextOutputRowGroup())
|
while (subDistAgg->nextRowGroup())
|
||||||
{
|
{
|
||||||
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
|
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
|
||||||
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
|
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
|
||||||
@ -419,7 +418,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
|
|||||||
rowGroupIn->initRow(&rowIn);
|
rowGroupIn->initRow(&rowIn);
|
||||||
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
|
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
|
||||||
|
|
||||||
while (subAgg->nextOutputRowGroup())
|
while (subAgg->nextRowGroup())
|
||||||
{
|
{
|
||||||
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
|
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
|
||||||
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
|
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
|
||||||
@ -574,7 +573,7 @@ bool TupleAggregateStep::nextDeliveredRowGroup()
|
|||||||
{
|
{
|
||||||
for (; fBucketNum < fNumOfBuckets; fBucketNum++)
|
for (; fBucketNum < fNumOfBuckets; fBucketNum++)
|
||||||
{
|
{
|
||||||
while (fAggregators[fBucketNum]->nextOutputRowGroup())
|
while (fAggregators[fBucketNum]->nextRowGroup())
|
||||||
{
|
{
|
||||||
fAggregators[fBucketNum]->finalize();
|
fAggregators[fBucketNum]->finalize();
|
||||||
fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
|
fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
|
||||||
@ -5704,27 +5703,14 @@ void TupleAggregateStep::doAggregate()
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @brief Aggregate input row groups in two-phase multi-threaded aggregation.
|
|
||||||
* In second phase handle three different aggregation cases differently:
|
|
||||||
* 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one
|
|
||||||
* GROUP BY column
|
|
||||||
* 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
|
|
||||||
* 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
|
|
||||||
* DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp.
|
|
||||||
*/
|
|
||||||
uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
|
uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
|
||||||
{
|
{
|
||||||
// initialize return value variable
|
uint32_t i;
|
||||||
|
RGData rgData;
|
||||||
uint64_t rowCount = 0;
|
uint64_t rowCount = 0;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns
|
|
||||||
* per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators ==
|
|
||||||
* fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator.
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (!fDoneAggregate)
|
if (!fDoneAggregate)
|
||||||
{
|
{
|
||||||
initializeMultiThread();
|
initializeMultiThread();
|
||||||
@ -5733,9 +5719,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
|||||||
runners.reserve(fNumOfThreads); // to prevent a resize during use
|
runners.reserve(fNumOfThreads); // to prevent a resize during use
|
||||||
|
|
||||||
// Start the aggregator threads
|
// Start the aggregator threads
|
||||||
for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++)
|
for (i = 0; i < fNumOfThreads; i++)
|
||||||
{
|
{
|
||||||
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum)));
|
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now wait for all those threads
|
// Now wait for all those threads
|
||||||
@ -5749,28 +5735,18 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
|||||||
// much memory on average
|
// much memory on average
|
||||||
uint32_t threads = std::max(1U, fNumOfThreads / 2);
|
uint32_t threads = std::max(1U, fNumOfThreads / 2);
|
||||||
runners.reserve(threads);
|
runners.reserve(threads);
|
||||||
for (uint32_t threadNum = 0; threadNum < threads; ++threadNum)
|
for (i = 0; i < threads; ++i)
|
||||||
{
|
{
|
||||||
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum)));
|
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i)));
|
||||||
}
|
}
|
||||||
jobstepThreadPool.join(runners);
|
jobstepThreadPool.join(runners);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
|
||||||
* Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows
|
|
||||||
* that need to aggregated and output results.
|
|
||||||
*/
|
|
||||||
|
|
||||||
auto* distinctAggregator = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
|
|
||||||
const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0;
|
|
||||||
|
|
||||||
// Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column
|
|
||||||
// e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2;
|
|
||||||
if (distinctAggregator && hasGroupByColumns)
|
|
||||||
{
|
{
|
||||||
|
// 2nd phase multi-threaded aggregate
|
||||||
if (!fEndOfResult)
|
if (!fEndOfResult)
|
||||||
{
|
{
|
||||||
// Do multi-threaded second phase aggregation (per row group created for GROUP BY statement)
|
|
||||||
if (!fDoneAggregate)
|
if (!fDoneAggregate)
|
||||||
{
|
{
|
||||||
vector<uint64_t> runners; // thread pool handles
|
vector<uint64_t> runners; // thread pool handles
|
||||||
@ -5778,143 +5754,132 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
|||||||
|
|
||||||
uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
|
uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
|
||||||
uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1);
|
uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1);
|
||||||
|
// uint32_t bucketsPerThread = 1;
|
||||||
|
// uint32_t numThreads = fNumOfBuckets;
|
||||||
|
|
||||||
runners.reserve(numThreads);
|
runners.reserve(numThreads);
|
||||||
|
|
||||||
for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++)
|
for (i = 0; i < numThreads; i++)
|
||||||
{
|
{
|
||||||
runners.push_back(jobstepThreadPool.invoke(
|
runners.push_back(jobstepThreadPool.invoke(
|
||||||
ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread)));
|
ThreadedSecondPhaseAggregator(this, i * bucketsPerThread, bucketsPerThread)));
|
||||||
}
|
}
|
||||||
|
|
||||||
jobstepThreadPool.join(runners);
|
jobstepThreadPool.join(runners);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deliver results
|
|
||||||
fDoneAggregate = true;
|
fDoneAggregate = true;
|
||||||
bool done = true;
|
bool done = true;
|
||||||
while (nextDeliveredRowGroup() && !cancelled())
|
|
||||||
|
while (nextDeliveredRowGroup())
|
||||||
{
|
{
|
||||||
done = false;
|
done = false;
|
||||||
rowCount = fRowGroupOut.getRowCount();
|
rowCount = fRowGroupOut.getRowCount();
|
||||||
|
|
||||||
if (rowCount != 0)
|
if (rowCount != 0)
|
||||||
{
|
{
|
||||||
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
||||||
|
pruneAuxColumns();
|
||||||
|
|
||||||
|
if (dlp)
|
||||||
|
{
|
||||||
|
rgData = fRowGroupDelivered.duplicate();
|
||||||
|
dlp->insert(rgData);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bs.restart();
|
||||||
|
fRowGroupDelivered.serializeRGData(bs);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (done)
|
if (done)
|
||||||
{
|
|
||||||
fEndOfResult = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
|
|
||||||
// e.g. SELECT SUM(DISTINCT col1) FROM test;
|
|
||||||
else if (distinctAggregator)
|
|
||||||
{
|
|
||||||
if (!fEndOfResult)
|
|
||||||
{
|
|
||||||
if (!fDoneAggregate)
|
|
||||||
{
|
|
||||||
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
|
|
||||||
// easy way of multi-threading this and it's done in a single thread for now.
|
|
||||||
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++)
|
|
||||||
{
|
|
||||||
if (fEndOfResult == false)
|
|
||||||
{
|
|
||||||
// The distinctAggregator accumulates the aggregation results of all row groups by being added
|
|
||||||
// all row groups of each bucket aggregator and doing an aggregation step after each addition.
|
|
||||||
auto* bucketMultiDistinctAggregator =
|
|
||||||
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[bucketNum].get());
|
|
||||||
auto* bucketDistinctAggregator =
|
|
||||||
dynamic_cast<RowAggregationDistinct*>(fAggregators[bucketNum].get());
|
|
||||||
distinctAggregator->aggregator(bucketDistinctAggregator->aggregator());
|
|
||||||
|
|
||||||
if (bucketMultiDistinctAggregator)
|
|
||||||
{
|
|
||||||
(dynamic_cast<RowAggregationMultiDistinct*>(distinctAggregator))
|
|
||||||
->subAggregators(bucketMultiDistinctAggregator->subAggregators());
|
|
||||||
}
|
|
||||||
|
|
||||||
distinctAggregator->aggregator()->finalAggregation();
|
|
||||||
distinctAggregator->doDistinctAggregation();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deliver results
|
|
||||||
fDoneAggregate = true;
|
|
||||||
bool done = true;
|
|
||||||
while (fAggregator->nextRowGroup() && !cancelled())
|
|
||||||
{
|
|
||||||
done = false;
|
|
||||||
fAggregator->finalize();
|
|
||||||
rowCount = fRowGroupOut.getRowCount();
|
|
||||||
fRowsReturned += rowCount;
|
|
||||||
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
|
||||||
|
|
||||||
if (rowCount != 0)
|
|
||||||
{
|
|
||||||
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
done = true;
|
|
||||||
}
|
|
||||||
if (done)
|
|
||||||
fEndOfResult = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
|
|
||||||
// e.g. SELECT SUM(col1) FROM test GROUP BY col2;
|
|
||||||
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
|
|
||||||
// easy way of multi-threading this and it's done in a single thread for now.
|
|
||||||
else if (hasGroupByColumns)
|
|
||||||
{
|
|
||||||
if (!fEndOfResult && !fDoneAggregate)
|
|
||||||
{
|
|
||||||
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum)
|
|
||||||
{
|
|
||||||
fAggregator->append(fAggregators[bucketNum].get());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fDoneAggregate = true;
|
|
||||||
bool done = true;
|
|
||||||
|
|
||||||
while (fAggregator->nextRowGroup() && !cancelled())
|
|
||||||
{
|
|
||||||
done = false;
|
|
||||||
fAggregator->finalize();
|
|
||||||
rowCount = fRowGroupOut.getRowCount();
|
|
||||||
fRowsReturned += rowCount;
|
|
||||||
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
|
||||||
|
|
||||||
if (rowCount != 0)
|
|
||||||
{
|
|
||||||
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
done = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (done)
|
|
||||||
{
|
|
||||||
fEndOfResult = true;
|
fEndOfResult = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw logic_error(
|
auto* agg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
|
||||||
"TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function "
|
|
||||||
"or "
|
if (!fEndOfResult)
|
||||||
"GROUP BY columns found. Should not reach here.");
|
{
|
||||||
|
if (!fDoneAggregate)
|
||||||
|
{
|
||||||
|
for (i = 0; i < fNumOfBuckets; i++)
|
||||||
|
{
|
||||||
|
if (fEndOfResult == false)
|
||||||
|
{
|
||||||
|
// do the final aggregtion and deliver the results
|
||||||
|
// at least one RowGroup for aggregate results
|
||||||
|
// for "distinct without group by" case
|
||||||
|
if (agg != nullptr)
|
||||||
|
{
|
||||||
|
auto* aggMultiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[i].get());
|
||||||
|
auto* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[i].get());
|
||||||
|
agg->aggregator(aggDist->aggregator());
|
||||||
|
|
||||||
|
if (aggMultiDist)
|
||||||
|
{
|
||||||
|
(dynamic_cast<RowAggregationMultiDistinct*>(agg))
|
||||||
|
->subAggregators(aggMultiDist->subAggregators());
|
||||||
|
}
|
||||||
|
|
||||||
|
agg->doDistinctAggregation();
|
||||||
|
}
|
||||||
|
// for "group by without distinct" case
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fAggregator->append(fAggregators[i].get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fDoneAggregate = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool done = true;
|
||||||
|
|
||||||
|
//@bug4459
|
||||||
|
while (fAggregator->nextRowGroup() && !cancelled())
|
||||||
|
{
|
||||||
|
done = false;
|
||||||
|
fAggregator->finalize();
|
||||||
|
rowCount = fRowGroupOut.getRowCount();
|
||||||
|
fRowsReturned += rowCount;
|
||||||
|
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
||||||
|
|
||||||
|
if (rowCount != 0)
|
||||||
|
{
|
||||||
|
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
||||||
|
pruneAuxColumns();
|
||||||
|
|
||||||
|
if (dlp)
|
||||||
|
{
|
||||||
|
rgData = fRowGroupDelivered.duplicate();
|
||||||
|
dlp->insert(rgData);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bs.restart();
|
||||||
|
fRowGroupDelivered.serializeRGData(bs);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (done)
|
||||||
|
{
|
||||||
|
fEndOfResult = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // try
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
||||||
@ -5954,23 +5919,6 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
|||||||
return rowCount;
|
return rowCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp)
|
|
||||||
{
|
|
||||||
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
|
||||||
pruneAuxColumns();
|
|
||||||
|
|
||||||
if (dlp)
|
|
||||||
{
|
|
||||||
RGData rgData = fRowGroupDelivered.duplicate();
|
|
||||||
dlp->insert(rgData);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bs.restart();
|
|
||||||
fRowGroupDelivered.serializeRGData(bs);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void TupleAggregateStep::pruneAuxColumns()
|
void TupleAggregateStep::pruneAuxColumns()
|
||||||
{
|
{
|
||||||
uint64_t rowCount = fRowGroupOut.getRowCount();
|
uint64_t rowCount = fRowGroupOut.getRowCount();
|
||||||
|
@ -161,7 +161,6 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep
|
|||||||
void doThreadedSecondPhaseAggregate(uint32_t threadID);
|
void doThreadedSecondPhaseAggregate(uint32_t threadID);
|
||||||
bool nextDeliveredRowGroup();
|
bool nextDeliveredRowGroup();
|
||||||
void pruneAuxColumns();
|
void pruneAuxColumns();
|
||||||
bool cleanUpAndOutputRowGroup(messageqcpp::ByteStream& bs, RowGroupDL* dlp);
|
|
||||||
void formatMiniStats();
|
void formatMiniStats();
|
||||||
void printCalTrace();
|
void printCalTrace();
|
||||||
template <class GroupByMap>
|
template <class GroupByMap>
|
||||||
|
@ -57,7 +57,7 @@
|
|||||||
#include "rowstorage.h"
|
#include "rowstorage.h"
|
||||||
|
|
||||||
//..comment out NDEBUG to enable assertions, uncomment NDEBUG to disable
|
//..comment out NDEBUG to enable assertions, uncomment NDEBUG to disable
|
||||||
// #define NDEBUG
|
//#define NDEBUG
|
||||||
#include "mcs_decimal.h"
|
#include "mcs_decimal.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
@ -315,7 +315,7 @@ void RowAggregation::updateStringMinMax(utils::NullString val1, utils::NullStrin
|
|||||||
if (val1.isNull())
|
if (val1.isNull())
|
||||||
{
|
{
|
||||||
// as any comparison with NULL is false, it should not affect min/max ranges.
|
// as any comparison with NULL is false, it should not affect min/max ranges.
|
||||||
return; // do nothing.
|
return ; // do nothing.
|
||||||
}
|
}
|
||||||
CHARSET_INFO* cs = fRow.getCharset(col);
|
CHARSET_INFO* cs = fRow.getCharset(col);
|
||||||
int tmp = cs->strnncoll(val1.str(), val1.length(), val2.str(), val2.length());
|
int tmp = cs->strnncoll(val1.str(), val1.length(), val2.str(), val2.length());
|
||||||
@ -810,8 +810,7 @@ void RowAggregation::aggregateRow(Row& row, const uint64_t* hash,
|
|||||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||||
{
|
{
|
||||||
uint32_t cnt = fRollupFlag ? fGroupByCols.size() : 1;
|
uint32_t cnt = fRollupFlag ? fGroupByCols.size() : 1;
|
||||||
for (uint32_t z = 0; z < cnt; z++)
|
for (uint32_t z = 0; z < cnt; z++) {
|
||||||
{
|
|
||||||
// groupby column list is not empty, find the entry.
|
// groupby column list is not empty, find the entry.
|
||||||
if (!fGroupByCols.empty())
|
if (!fGroupByCols.empty())
|
||||||
{
|
{
|
||||||
@ -857,8 +856,7 @@ void RowAggregation::aggregateRow(Row& row, const uint64_t* hash,
|
|||||||
updateEntry(row, rgContextColl);
|
updateEntry(row, rgContextColl);
|
||||||
// these quantities are unsigned and comparing z and cnt - 1 can be incorrect
|
// these quantities are unsigned and comparing z and cnt - 1 can be incorrect
|
||||||
// because cnt can be zero.
|
// because cnt can be zero.
|
||||||
if ((z + 1 < cnt))
|
if ((z + 1 < cnt)) {
|
||||||
{
|
|
||||||
// if we are rolling up, we mark appropriate field as NULL and also increment
|
// if we are rolling up, we mark appropriate field as NULL and also increment
|
||||||
// value in the "mark" column, so that we can differentiate between data and
|
// value in the "mark" column, so that we can differentiate between data and
|
||||||
// various rollups.
|
// various rollups.
|
||||||
@ -1171,8 +1169,8 @@ void RowAggregation::doMinMax(const Row& rowIn, int64_t colIn, int64_t colOut, i
|
|||||||
{
|
{
|
||||||
if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
||||||
{
|
{
|
||||||
updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(),
|
updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(), colOut,
|
||||||
colOut, funcType);
|
funcType);
|
||||||
}
|
}
|
||||||
else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
||||||
{
|
{
|
||||||
@ -2122,9 +2120,10 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
|
|||||||
long double mean = fRow.getLongDoubleField(colAux);
|
long double mean = fRow.getLongDoubleField(colAux);
|
||||||
long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1);
|
long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1);
|
||||||
volatile long double delta = valIn - mean;
|
volatile long double delta = valIn - mean;
|
||||||
mean += delta / count;
|
mean += delta/count;
|
||||||
scaledMomentum2 += delta * (valIn - mean);
|
scaledMomentum2 += delta * (valIn - mean);
|
||||||
|
|
||||||
|
|
||||||
fRow.setDoubleField(count, colOut);
|
fRow.setDoubleField(count, colOut);
|
||||||
fRow.setLongDoubleField(mean, colAux);
|
fRow.setLongDoubleField(mean, colAux);
|
||||||
fRow.setLongDoubleField(scaledMomentum2, colAux + 1);
|
fRow.setLongDoubleField(scaledMomentum2, colAux + 1);
|
||||||
@ -2174,7 +2173,8 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int
|
|||||||
cc = dynamic_cast<execplan::ConstantColumn*>(fFunctionCols[funcColsIdx]->fpConstCol.get());
|
cc = dynamic_cast<execplan::ConstantColumn*>(fFunctionCols[funcColsIdx]->fpConstCol.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((cc && cc->isNull()) || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|
if ((cc && cc->isNull()) ||
|
||||||
|
(!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|
||||||
{
|
{
|
||||||
if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||||
{
|
{
|
||||||
@ -2500,8 +2500,7 @@ void RowAggregation::loadEmptySet(messageqcpp::ByteStream& bs)
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||||
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit,
|
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit, bool withRollup)
|
||||||
bool withRollup)
|
|
||||||
: RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
: RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
||||||
, fHasAvg(false)
|
, fHasAvg(false)
|
||||||
, fHasStatsFunc(false)
|
, fHasStatsFunc(false)
|
||||||
@ -4221,26 +4220,13 @@ bool RowAggregationUM::nextRowGroup()
|
|||||||
return more;
|
return more;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RowAggregationUM::nextOutputRowGroup()
|
|
||||||
{
|
|
||||||
bool more = fRowAggStorage->getNextOutputRGData(fCurRGData);
|
|
||||||
|
|
||||||
if (more)
|
|
||||||
{
|
|
||||||
fRowGroupOut->setData(fCurRGData.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
return more;
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
// Row Aggregation constructor used on UM
|
// Row Aggregation constructor used on UM
|
||||||
// For 2nd phase of two-phase case, from partial RG to final aggregated RG
|
// For 2nd phase of two-phase case, from partial RG to final aggregated RG
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
RowAggregationUMP2::RowAggregationUMP2(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
RowAggregationUMP2::RowAggregationUMP2(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||||
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit,
|
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit, bool withRollup)
|
||||||
bool withRollup)
|
|
||||||
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -4464,8 +4450,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut,
|
|||||||
{
|
{
|
||||||
if (LIKELY(cnt > 0))
|
if (LIKELY(cnt > 0))
|
||||||
{
|
{
|
||||||
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();;
|
||||||
;
|
|
||||||
int128_t sum = valOut + wideValue;
|
int128_t sum = valOut + wideValue;
|
||||||
fRow.setInt128Field(sum, colOut);
|
fRow.setInt128Field(sum, colOut);
|
||||||
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
|
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
|
||||||
@ -4524,8 +4509,7 @@ void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t c
|
|||||||
{
|
{
|
||||||
volatile long double delta = mean - blockMean;
|
volatile long double delta = mean - blockMean;
|
||||||
nextMean = (mean * count + blockMean * blockCount) / nextCount;
|
nextMean = (mean * count + blockMean * blockCount) / nextCount;
|
||||||
nextScaledMomentum2 =
|
nextScaledMomentum2 = scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount);
|
||||||
scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount);
|
|
||||||
}
|
}
|
||||||
fRow.setDoubleField(nextCount, colOut);
|
fRow.setDoubleField(nextCount, colOut);
|
||||||
fRow.setLongDoubleField(nextMean, colAux);
|
fRow.setLongDoubleField(nextMean, colAux);
|
||||||
@ -4698,10 +4682,7 @@ void RowAggregationDistinct::addRowGroup(const RowGroup* pRows,
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void RowAggregationDistinct::doDistinctAggregation()
|
void RowAggregationDistinct::doDistinctAggregation()
|
||||||
{
|
{
|
||||||
auto* umAggregator = dynamic_cast<RowAggregationUM*>(fAggregator.get());
|
while (dynamic_cast<RowAggregationUM*>(fAggregator.get())->nextRowGroup())
|
||||||
if (umAggregator)
|
|
||||||
{
|
|
||||||
while (umAggregator->nextOutputRowGroup())
|
|
||||||
{
|
{
|
||||||
fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData());
|
fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData());
|
||||||
|
|
||||||
@ -4714,13 +4695,6 @@ void RowAggregationDistinct::doDistinctAggregation()
|
|||||||
aggregateRow(rowIn);
|
aggregateRow(rowIn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::ostringstream errmsg;
|
|
||||||
errmsg << "RowAggregationDistinct: incorrect fAggregator class.";
|
|
||||||
cerr << errmsg.str() << endl;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RowAggregationDistinct::doDistinctAggregation_rowVec(vector<std::pair<Row::Pointer, uint64_t>>& inRows)
|
void RowAggregationDistinct::doDistinctAggregation_rowVec(vector<std::pair<Row::Pointer, uint64_t>>& inRows)
|
||||||
|
@ -681,17 +681,6 @@ class RowAggregationUM : public RowAggregation
|
|||||||
*/
|
*/
|
||||||
bool nextRowGroup();
|
bool nextRowGroup();
|
||||||
|
|
||||||
/** @brief Returns aggregated rows in a RowGroup as long as there are still not returned result RowGroups.
|
|
||||||
*
|
|
||||||
* This function should be called repeatedly until false is returned (meaning end of data).
|
|
||||||
* Returns data from in-memory storage, as well as spilled data from disk. If disk-based aggregation is
|
|
||||||
* happening, finalAggregation() should be called before returning result RowGroups to finalize the used
|
|
||||||
* RowAggStorages, merge different spilled generations and obtain correct aggregation results.
|
|
||||||
*
|
|
||||||
* @returns True if there are more result RowGroups, else false if all results have been returned.
|
|
||||||
*/
|
|
||||||
bool nextOutputRowGroup();
|
|
||||||
|
|
||||||
/** @brief Add an aggregator for DISTINCT aggregation
|
/** @brief Add an aggregator for DISTINCT aggregation
|
||||||
*/
|
*/
|
||||||
void distinctAggregator(const boost::shared_ptr<RowAggregation>& da)
|
void distinctAggregator(const boost::shared_ptr<RowAggregation>& da)
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
#include <cstdint>
|
|
||||||
#include "rowgroup.h"
|
#include "rowgroup.h"
|
||||||
#include <resourcemanager.h>
|
#include <resourcemanager.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
@ -80,11 +79,6 @@ std::string errorString(int errNo)
|
|||||||
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
|
auto* buf = strerror_r(errNo, tmp, sizeof(tmp));
|
||||||
return {buf};
|
return {buf};
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t findFirstSetBit(const uint64_t mask)
|
|
||||||
{
|
|
||||||
return __builtin_ffsll(mask);
|
|
||||||
}
|
|
||||||
} // anonymous namespace
|
} // anonymous namespace
|
||||||
|
|
||||||
namespace rowgroup
|
namespace rowgroup
|
||||||
@ -558,7 +552,7 @@ class Dumper
|
|||||||
class RowGroupStorage
|
class RowGroupStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using RGDataStorage = std::vector<RGDataUnPtr>;
|
using RGDataStorage = std::vector<std::unique_ptr<RGData>>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/** @brief Default constructor
|
/** @brief Default constructor
|
||||||
@ -619,54 +613,6 @@ class RowGroupStorage
|
|||||||
return fRowGroupOut->getSizeWithStrings(fMaxRows);
|
return fRowGroupOut->getSizeWithStrings(fMaxRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This shifts data within RGData such that it compacts the non finalized rows
|
|
||||||
PosOpos shiftRowsInRowGroup(RGDataUnPtr& rgdata, uint64_t fgid, uint64_t tgid)
|
|
||||||
{
|
|
||||||
uint64_t pos = 0;
|
|
||||||
uint64_t opos = 0;
|
|
||||||
|
|
||||||
fRowGroupOut->setData(rgdata.get());
|
|
||||||
for (auto i = fgid; i < tgid; ++i)
|
|
||||||
{
|
|
||||||
if ((i - fgid) * HashMaskElements >= fRowGroupOut->getRowCount())
|
|
||||||
break;
|
|
||||||
uint64_t mask = ~fFinalizedRows[i];
|
|
||||||
if ((i - fgid + 1) * HashMaskElements > fRowGroupOut->getRowCount())
|
|
||||||
{
|
|
||||||
mask &= (~0ULL) >> ((i - fgid + 1) * HashMaskElements - fRowGroupOut->getRowCount());
|
|
||||||
}
|
|
||||||
opos = (i - fgid) * HashMaskElements;
|
|
||||||
|
|
||||||
if (mask == ~0ULL)
|
|
||||||
{
|
|
||||||
if (LIKELY(pos != opos))
|
|
||||||
moveRows(rgdata.get(), pos, opos, HashMaskElements);
|
|
||||||
pos += HashMaskElements;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mask == 0)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
while (mask != 0)
|
|
||||||
{
|
|
||||||
// find position until block full of not finalized rows.
|
|
||||||
size_t b = findFirstSetBit(mask);
|
|
||||||
size_t e = findFirstSetBit(~(mask >> b)) + b;
|
|
||||||
if (UNLIKELY(e >= HashMaskElements))
|
|
||||||
mask = 0;
|
|
||||||
else
|
|
||||||
mask >>= e;
|
|
||||||
if (LIKELY(pos != opos + b - 1))
|
|
||||||
moveRows(rgdata.get(), pos, opos + b - 1, e - b);
|
|
||||||
pos += e - b;
|
|
||||||
opos += e;
|
|
||||||
}
|
|
||||||
--opos;
|
|
||||||
}
|
|
||||||
return {pos, opos};
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @brief Take away RGDatas from another RowGroupStorage
|
/** @brief Take away RGDatas from another RowGroupStorage
|
||||||
*
|
*
|
||||||
* If some of the RGDatas is not in the memory do not load them,
|
* If some of the RGDatas is not in the memory do not load them,
|
||||||
@ -680,7 +626,7 @@ class RowGroupStorage
|
|||||||
}
|
}
|
||||||
void append(RowGroupStorage* o)
|
void append(RowGroupStorage* o)
|
||||||
{
|
{
|
||||||
RGDataUnPtr rgd;
|
std::unique_ptr<RGData> rgd;
|
||||||
std::string ofname;
|
std::string ofname;
|
||||||
while (o->getNextRGData(rgd, ofname))
|
while (o->getNextRGData(rgd, ofname))
|
||||||
{
|
{
|
||||||
@ -720,130 +666,11 @@ class RowGroupStorage
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @brief Get the last RGData from fRGDatas, remove it from the vector and return its id.
|
|
||||||
*
|
|
||||||
* @param rgdata The RGData to be retrieved
|
|
||||||
*/
|
|
||||||
uint64_t getLastRGData(RGDataUnPtr& rgdata)
|
|
||||||
{
|
|
||||||
assert(!fRGDatas.empty());
|
|
||||||
uint64_t rgid = fRGDatas.size() - 1;
|
|
||||||
rgdata = std::move(fRGDatas[rgid]);
|
|
||||||
fRGDatas.pop_back();
|
|
||||||
return rgid;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FgidTgid calculateGids(const uint64_t rgid, const uint64_t fMaxRows)
|
|
||||||
{
|
|
||||||
// Calculate from first and last uint64_t entry in fFinalizedRows BitMap
|
|
||||||
// which contains information about rows in the RGData.
|
|
||||||
uint64_t fgid = rgid * fMaxRows / HashMaskElements;
|
|
||||||
uint64_t tgid = fgid + fMaxRows / HashMaskElements;
|
|
||||||
return {fgid, tgid};
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @brief Used to output aggregation results from memory and disk in the current generation in the form of
|
|
||||||
* RGData. Returns next RGData, loads from disk if necessary. Skips finalized rows as they would contain
|
|
||||||
* duplicate results, compacts actual rows into start of RGData and adapts number of rows transmitted in
|
|
||||||
* RGData.
|
|
||||||
* @returns A pointer to the next RGData or an empty pointer if there are no more RGDatas in this
|
|
||||||
* generation.
|
|
||||||
*/
|
|
||||||
bool getNextOutputRGData(RGDataUnPtr& rgdata)
|
|
||||||
{
|
|
||||||
if (UNLIKELY(fRGDatas.empty()))
|
|
||||||
{
|
|
||||||
fMM->release();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!fRGDatas.empty())
|
|
||||||
{
|
|
||||||
auto rgid = getLastRGData(rgdata);
|
|
||||||
auto [fgid, tgid] = calculateGids(rgid, fMaxRows);
|
|
||||||
|
|
||||||
if (fFinalizedRows.size() <= fgid)
|
|
||||||
{
|
|
||||||
// There are no finalized rows in this RGData. We can just return it.
|
|
||||||
// Load from disk if necessary and unlink DumpFile.
|
|
||||||
if (!rgdata)
|
|
||||||
{
|
|
||||||
loadRG(rgid, rgdata, true);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tgid >= fFinalizedRows.size())
|
|
||||||
fFinalizedRows.resize(tgid + 1, 0ULL);
|
|
||||||
|
|
||||||
// Check if there are rows to process
|
|
||||||
bool hasReturnRows = false;
|
|
||||||
for (auto i = fgid; i < tgid; ++i)
|
|
||||||
{
|
|
||||||
if (fFinalizedRows[i] != ~0ULL)
|
|
||||||
{
|
|
||||||
// Not all rows are finalized, we have to return at least parts of this RGData
|
|
||||||
hasReturnRows = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rgdata)
|
|
||||||
{
|
|
||||||
// RGData is currently in memory
|
|
||||||
if (!hasReturnRows)
|
|
||||||
{
|
|
||||||
// All rows are finalized, don't return this RGData
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (hasReturnRows)
|
|
||||||
{
|
|
||||||
// Load RGData from disk, unlink dump file and continue processing
|
|
||||||
loadRG(rgid, rgdata, true);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// All rows are finalized. Unlink dump file and continue search for return RGData
|
|
||||||
unlink(makeRGFilename(rgid).c_str());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid);
|
|
||||||
|
|
||||||
// Nothing got shifted at all -> all rows must be finalized. If all rows finalized remove
|
|
||||||
// RGData and file and don't give it out.
|
|
||||||
if (pos == 0)
|
|
||||||
{
|
|
||||||
fLRU->remove(rgid);
|
|
||||||
unlink(makeRGFilename(rgid).c_str());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set RGData with number of not finalized rows which have been compacted at front of RGData
|
|
||||||
fRowGroupOut->setData(rgdata.get());
|
|
||||||
fRowGroupOut->setRowCount(pos);
|
|
||||||
int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows);
|
|
||||||
|
|
||||||
// Release the memory used by the current rgdata from this MemoryManager.
|
|
||||||
fMM->release(memSz);
|
|
||||||
unlink(makeRGFilename(rgid).c_str());
|
|
||||||
|
|
||||||
// to periodically clean up freed memory so it can be used by other threads.
|
|
||||||
fLRU->remove(rgid);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @brief Returns next RGData, load it from disk if necessary.
|
/** @brief Returns next RGData, load it from disk if necessary.
|
||||||
*
|
*
|
||||||
* @returns pointer to the next RGData or empty pointer if there is nothing
|
* @returns pointer to the next RGData or empty pointer if there is nothing
|
||||||
*/
|
*/
|
||||||
RGDataUnPtr getNextRGData()
|
std::unique_ptr<RGData> getNextRGData()
|
||||||
{
|
{
|
||||||
while (!fRGDatas.empty())
|
while (!fRGDatas.empty())
|
||||||
{
|
{
|
||||||
@ -1203,7 +1030,7 @@ class RowGroupStorage
|
|||||||
* @param fname(out) Filename of the dump if it's not in the memory
|
* @param fname(out) Filename of the dump if it's not in the memory
|
||||||
* @returns true if there is available RGData
|
* @returns true if there is available RGData
|
||||||
*/
|
*/
|
||||||
bool getNextRGData(RGDataUnPtr& rgdata, std::string& fname)
|
bool getNextRGData(std::unique_ptr<RGData>& rgdata, std::string& fname)
|
||||||
{
|
{
|
||||||
if (UNLIKELY(fRGDatas.empty()))
|
if (UNLIKELY(fRGDatas.empty()))
|
||||||
{
|
{
|
||||||
@ -1212,9 +1039,12 @@ class RowGroupStorage
|
|||||||
}
|
}
|
||||||
while (!fRGDatas.empty())
|
while (!fRGDatas.empty())
|
||||||
{
|
{
|
||||||
auto rgid = getLastRGData(rgdata);
|
uint64_t rgid = fRGDatas.size() - 1;
|
||||||
auto [fgid, tgid] = calculateGids(rgid, fMaxRows);
|
rgdata = std::move(fRGDatas[rgid]);
|
||||||
|
fRGDatas.pop_back();
|
||||||
|
|
||||||
|
uint64_t fgid = rgid * fMaxRows / 64;
|
||||||
|
uint64_t tgid = fgid + fMaxRows / 64;
|
||||||
if (fFinalizedRows.size() > fgid)
|
if (fFinalizedRows.size() > fgid)
|
||||||
{
|
{
|
||||||
if (tgid >= fFinalizedRows.size())
|
if (tgid >= fFinalizedRows.size())
|
||||||
@ -1238,7 +1068,45 @@ class RowGroupStorage
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid);
|
uint64_t pos = 0;
|
||||||
|
uint64_t opos = 0;
|
||||||
|
fRowGroupOut->setData(rgdata.get());
|
||||||
|
for (auto i = fgid; i < tgid; ++i)
|
||||||
|
{
|
||||||
|
if ((i - fgid) * 64 >= fRowGroupOut->getRowCount())
|
||||||
|
break;
|
||||||
|
uint64_t mask = ~fFinalizedRows[i];
|
||||||
|
if ((i - fgid + 1) * 64 > fRowGroupOut->getRowCount())
|
||||||
|
{
|
||||||
|
mask &= (~0ULL) >> ((i - fgid + 1) * 64 - fRowGroupOut->getRowCount());
|
||||||
|
}
|
||||||
|
opos = (i - fgid) * 64;
|
||||||
|
if (mask == ~0ULL)
|
||||||
|
{
|
||||||
|
if (LIKELY(pos != opos))
|
||||||
|
moveRows(rgdata.get(), pos, opos, 64);
|
||||||
|
pos += 64;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mask == 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
while (mask != 0)
|
||||||
|
{
|
||||||
|
size_t b = __builtin_ffsll(mask);
|
||||||
|
size_t e = __builtin_ffsll(~(mask >> b)) + b;
|
||||||
|
if (UNLIKELY(e >= 64))
|
||||||
|
mask = 0;
|
||||||
|
else
|
||||||
|
mask >>= e;
|
||||||
|
if (LIKELY(pos != opos + b - 1))
|
||||||
|
moveRows(rgdata.get(), pos, opos + b - 1, e - b);
|
||||||
|
pos += e - b;
|
||||||
|
opos += e;
|
||||||
|
}
|
||||||
|
--opos;
|
||||||
|
}
|
||||||
|
|
||||||
if (pos == 0)
|
if (pos == 0)
|
||||||
{
|
{
|
||||||
@ -1251,7 +1119,6 @@ class RowGroupStorage
|
|||||||
fRowGroupOut->setRowCount(pos);
|
fRowGroupOut->setRowCount(pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release the memory used by the current rgdata.
|
|
||||||
if (rgdata)
|
if (rgdata)
|
||||||
{
|
{
|
||||||
fRowGroupOut->setData(rgdata.get());
|
fRowGroupOut->setData(rgdata.get());
|
||||||
@ -1263,7 +1130,6 @@ class RowGroupStorage
|
|||||||
{
|
{
|
||||||
fname = makeRGFilename(rgid);
|
fname = makeRGFilename(rgid);
|
||||||
}
|
}
|
||||||
// to periodically clean up freed memory so it can be used by other threads.
|
|
||||||
fLRU->remove(rgid);
|
fLRU->remove(rgid);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1303,7 +1169,7 @@ class RowGroupStorage
|
|||||||
loadRG(rgid, fRGDatas[rgid]);
|
loadRG(rgid, fRGDatas[rgid]);
|
||||||
}
|
}
|
||||||
|
|
||||||
void loadRG(uint64_t rgid, RGDataUnPtr& rgdata, bool unlinkDump = false)
|
void loadRG(uint64_t rgid, std::unique_ptr<RGData>& rgdata, bool unlinkDump = false)
|
||||||
{
|
{
|
||||||
auto fname = makeRGFilename(rgid);
|
auto fname = makeRGFilename(rgid);
|
||||||
|
|
||||||
@ -1871,7 +1737,7 @@ void RowAggStorage::append(RowAggStorage& other)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RGDataUnPtr RowAggStorage::getNextRGData()
|
std::unique_ptr<RGData> RowAggStorage::getNextRGData()
|
||||||
{
|
{
|
||||||
if (!fStorage)
|
if (!fStorage)
|
||||||
{
|
{
|
||||||
@ -1882,43 +1748,6 @@ RGDataUnPtr RowAggStorage::getNextRGData()
|
|||||||
return fStorage->getNextRGData();
|
return fStorage->getNextRGData();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RowAggStorage::getNextOutputRGData(RGDataUnPtr& rgdata)
|
|
||||||
{
|
|
||||||
if (!fStorage)
|
|
||||||
{
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup();
|
|
||||||
freeData();
|
|
||||||
|
|
||||||
// fGeneration is an unsigned int, we need a signed int for a comparison >= 0
|
|
||||||
int32_t gen = fGeneration;
|
|
||||||
while (gen >= 0)
|
|
||||||
{
|
|
||||||
bool moreInGeneration = fStorage->getNextOutputRGData(rgdata);
|
|
||||||
|
|
||||||
if (moreInGeneration)
|
|
||||||
{
|
|
||||||
fRowGroupOut->setData(rgdata.get());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// all generations have been emptied
|
|
||||||
if (fGeneration == 0)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// current generation has no more RGDatas to return
|
|
||||||
// load earlier generation and continue with returning its RGDatas
|
|
||||||
gen--;
|
|
||||||
fGeneration--;
|
|
||||||
fStorage.reset(fStorage->clone(fGeneration));
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void RowAggStorage::freeData()
|
void RowAggStorage::freeData()
|
||||||
{
|
{
|
||||||
for (auto& data : fGens)
|
for (auto& data : fGens)
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
#include "resourcemanager.h"
|
#include "resourcemanager.h"
|
||||||
#include "rowgroup.h"
|
#include "rowgroup.h"
|
||||||
#include "idbcompress.h"
|
#include "idbcompress.h"
|
||||||
#include <cstdint>
|
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@ -36,15 +35,10 @@ class RowPosHashStorage;
|
|||||||
using RowPosHashStoragePtr = std::unique_ptr<RowPosHashStorage>;
|
using RowPosHashStoragePtr = std::unique_ptr<RowPosHashStorage>;
|
||||||
class RowGroupStorage;
|
class RowGroupStorage;
|
||||||
|
|
||||||
using RGDataUnPtr = std::unique_ptr<RGData>;
|
|
||||||
using PosOpos = std::pair<uint64_t, uint64_t>;
|
|
||||||
using FgidTgid = std::pair<uint64_t, uint64_t>;
|
|
||||||
|
|
||||||
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol);
|
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol);
|
||||||
|
|
||||||
constexpr const size_t MaxConstStrSize = 2048ULL;
|
constexpr const size_t MaxConstStrSize = 2048ULL;
|
||||||
constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1;
|
constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1;
|
||||||
constexpr const uint64_t HashMaskElements = 64ULL;
|
|
||||||
|
|
||||||
class RowAggStorage
|
class RowAggStorage
|
||||||
{
|
{
|
||||||
@ -103,12 +97,6 @@ class RowAggStorage
|
|||||||
*/
|
*/
|
||||||
std::unique_ptr<RGData> getNextRGData();
|
std::unique_ptr<RGData> getNextRGData();
|
||||||
|
|
||||||
/** @brief Remove last RGData from in-memory storage or disk.
|
|
||||||
* Iterates over all generations on disk if available.
|
|
||||||
* @returns True if RGData is returned in parameter or false if no more RGDatas can be returned.
|
|
||||||
*/
|
|
||||||
bool getNextOutputRGData(std::unique_ptr<RGData>& rgdata);
|
|
||||||
|
|
||||||
/** @brief TODO
|
/** @brief TODO
|
||||||
*
|
*
|
||||||
* @param mergeFunc
|
* @param mergeFunc
|
||||||
|
@ -28,7 +28,6 @@
|
|||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <cerrno>
|
#include <cerrno>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <mutex>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
Reference in New Issue
Block a user