You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
feat(primproc,aggregegation)!: Changes for ROLLUP with single-phase aggregation (#3025)
The fix is simple: enable subtotals in single-phase aggregation and disable parallel processing when there are subtotals and aggregation is single-phase.
This commit is contained in:
@ -1503,7 +1503,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>&
|
||||
|
||||
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
|
||||
jobInfo.stringTableThreshold);
|
||||
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit, jobInfo.hasRollup));
|
||||
rowAgg->timeZone(jobInfo.timeZone);
|
||||
rowgroups.push_back(aggRG);
|
||||
aggregators.push_back(rowAgg);
|
||||
@ -5443,7 +5443,6 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
|
||||
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
||||
{
|
||||
fAggregators[i].reset(fAggregator->clone());
|
||||
fAggregators[i]->clearRollup();
|
||||
fAggregators[i]->setInputOutput(fRowGroupIn, &fRowGroupOuts[i]);
|
||||
}
|
||||
}
|
||||
@ -5497,7 +5496,19 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
|
||||
// The key is the groupby columns, which are the leading columns.
|
||||
// TBD This approach could potential
|
||||
// put all values in on bucket.
|
||||
uint64_t hash = rowgroup::hashRow(rowIn, hashLens[0] - 1);
|
||||
// The fAggregator->hasRollup() is true when we perform one-phase
|
||||
// aggregation and also are doing subtotals' computations.
|
||||
// Subtotals produce new keys whose hash values may not be in
|
||||
// the processing bucket. Consider case for key tuples (1,2) and (1,3).
|
||||
// Their subtotals's keys will be (1, NULL) and (1, NULL)
|
||||
// but they will be left in their processing buckets and never
|
||||
// gets aggregated properly.
|
||||
// Due to this, we put all rows into the same bucket 0 when perfoming
|
||||
// single-phase aggregation with subtotals.
|
||||
// For all other cases (single-phase without subtotals and two-phase
|
||||
// aggregation with and without subtotals) fAggregator->hasRollup() is false.
|
||||
// In these cases we have full parallel processing as expected.
|
||||
uint64_t hash = fAggregator->hasRollup() ? 0 : rowgroup::hashRow(rowIn, hashLens[0] - 1);
|
||||
int bucketID = hash % fNumOfBuckets;
|
||||
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
|
||||
rowIn.nextRow();
|
||||
|
@ -56,6 +56,15 @@ Paraguay non-fiction 17790
|
||||
Senegal NULL 171762
|
||||
Senegal fiction 27881
|
||||
Senegal non-fiction 143881
|
||||
SELECT country, genre, SUM(sales) FROM (SELECT country, genre, sales FROM booksales) t1 GROUP BY country, genre WITH ROLLUP;
|
||||
country genre SUM(sales)
|
||||
NULL NULL 354462
|
||||
Paraguay NULL 182700
|
||||
Paraguay fiction 164910
|
||||
Paraguay non-fiction 17790
|
||||
Senegal NULL 171762
|
||||
Senegal fiction 27881
|
||||
Senegal non-fiction 143881
|
||||
CREATE TABLE three_cols ( key1 INTEGER, key2 INTEGER, value DECIMAL(38)) ENGINE=COLUMNSTORE;
|
||||
INSERT INTO three_cols(key1, key2, value) VALUES
|
||||
(NULL, NULL, NULL)
|
||||
|
@ -42,6 +42,9 @@ SELECT year, SUM(sales) FROM booksales GROUP BY year WITH ROLLUP;
|
||||
--sorted_result
|
||||
SELECT country, genre, SUM(sales) FROM booksales GROUP BY country, genre WITH ROLLUP;
|
||||
|
||||
--sorted_result
|
||||
SELECT country, genre, SUM(sales) FROM (SELECT country, genre, sales FROM booksales) t1 GROUP BY country, genre WITH ROLLUP;
|
||||
|
||||
CREATE TABLE three_cols ( key1 INTEGER, key2 INTEGER, value DECIMAL(38)) ENGINE=COLUMNSTORE;
|
||||
|
||||
INSERT INTO three_cols(key1, key2, value) VALUES
|
||||
|
@ -42,7 +42,7 @@ int64_t encodeStringPrefix(const uint8_t* str, size_t len, datatypes::Charset& c
|
||||
|
||||
int64_t encodeStringPrefix_check_null(const uint8_t* str, size_t len, datatypes::Charset& cset)
|
||||
{
|
||||
if (len < 1 && str == nullptr)
|
||||
if (len < 1)
|
||||
{
|
||||
return joblist::UBIGINTNULL;
|
||||
}
|
||||
|
@ -425,6 +425,8 @@ class RowAggregation : public messageqcpp::Serializeable
|
||||
|
||||
void clearRollup() { fRollupFlag = false; }
|
||||
|
||||
bool hasRollup() const { return fRollupFlag; }
|
||||
|
||||
/** @brief Define content of data to be joined
|
||||
*
|
||||
* This method must be call after setInputOutput() for PM hashjoin case.
|
||||
|
Reference in New Issue
Block a user