You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
feat(rbo,rules): find suitable indexed column to be used for range partitioning
This commit is contained in:
@ -65,16 +65,16 @@ bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
||||
|
||||
// This routine produces a new ParseTree that is AND(lowerBand <= column, column <= upperBand)
|
||||
// TODO add engine-independent statistics-derived ranges
|
||||
execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
||||
execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, execplan::SimpleColumn& column,
|
||||
std::pair<uint64_t, uint64_t>& bound)
|
||||
{
|
||||
// INV this is SimpleColumn we supply as an argument
|
||||
// TODO find the suitable column using EI statistics.
|
||||
auto* column = dynamic_cast<execplan::SimpleColumn*>(csep->returnedCols().front().get());
|
||||
assert(column);
|
||||
// auto* column = dynamic_cast<execplan::SimpleColumn*>(csep->returnedCols().front().get());
|
||||
// assert(column);
|
||||
|
||||
auto tableKeyColumnLeftOp = new execplan::SimpleColumn(*column);
|
||||
tableKeyColumnLeftOp->resultType(column->resultType());
|
||||
auto tableKeyColumnLeftOp = new execplan::SimpleColumn(column);
|
||||
tableKeyColumnLeftOp->resultType(column.resultType());
|
||||
|
||||
// TODO Nobody owns this allocation and cleanup only depends on delete in ParseTree nodes' dtors.
|
||||
auto* filterColLeftOp = new execplan::ConstantColumnUInt(bound.second, 0, 0);
|
||||
@ -87,8 +87,8 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
||||
|
||||
auto* sfr = new execplan::SimpleFilter(ltOp, tableKeyColumnLeftOp, filterColLeftOp);
|
||||
// auto tableKeyColumn = derivedSCEP->returnedCols().front();
|
||||
auto tableKeyColumnRightOp = new execplan::SimpleColumn(*column);
|
||||
tableKeyColumnRightOp->resultType(column->resultType());
|
||||
auto tableKeyColumnRightOp = new execplan::SimpleColumn(column);
|
||||
tableKeyColumnRightOp->resultType(column.resultType());
|
||||
// TODO hardcoded column type and value
|
||||
auto* filterColRightOp = new execplan::ConstantColumnUInt(bound.first, 0, 0);
|
||||
|
||||
@ -117,16 +117,24 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
||||
// INV nullptr signifies that no suitable column was found
|
||||
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||
{
|
||||
std::cout << "findSuitableKeyColumn " << csep.returnedCols().size() << std::endl;
|
||||
for (auto& rc : csep.returnedCols())
|
||||
{
|
||||
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
||||
if (simpleColumn)
|
||||
{
|
||||
std::cout << "Found simple column " << simpleColumn->columnName() << std::endl;
|
||||
cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->tableName(), simpleColumn->columnName()};
|
||||
cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->schemaName(), simpleColumn->tableName()};
|
||||
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableNam);
|
||||
|
||||
return simpleColumn;
|
||||
if (!columnStatistics)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto columnStatisticsIt = columnStatistics->find(simpleColumn->columnName());
|
||||
if (columnStatisticsIt != columnStatistics->end())
|
||||
{
|
||||
return simpleColumn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,9 +175,8 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||
}
|
||||
|
||||
auto columnStatistics = columnStatisticsIt->second;
|
||||
std::cout << "Histogram_json_hb histogram size " << columnStatistics.get_json_histogram().size() << std::endl;
|
||||
|
||||
// TODO configurable parallel factor
|
||||
// TODO configurable parallel factor via session variable
|
||||
// NB now histogram size is the way to control parallel factor with 16 being the maximum
|
||||
size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor);
|
||||
size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits;
|
||||
@ -188,23 +195,26 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||
|
||||
std::cout << "currentLowerBound " << currentLowerBound << " currentUpperBound " << currentUpperBound
|
||||
<< std::endl;
|
||||
bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound));
|
||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||
}
|
||||
|
||||
// Add last range
|
||||
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
||||
auto lastBucket = columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
||||
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
||||
std::cout << "lastBucket start_value " << currentLowerBound << std::endl;
|
||||
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
||||
std::cout << "Histogram end_value " << currentUpperBound << std::endl;
|
||||
|
||||
std::cout << "last currentLowerBound " << currentLowerBound << " last currentUpperBound " << currentUpperBound
|
||||
<< std::endl;
|
||||
bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound));
|
||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||
|
||||
for (auto& bound : bounds)
|
||||
{
|
||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||
// Add BETWEEN based on key column range
|
||||
clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound));
|
||||
clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, *keyColumn, bound));
|
||||
unionVec.push_back(clonedCSEP);
|
||||
}
|
||||
|
||||
@ -270,87 +280,87 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
||||
csep.returnedCols(newReturnedColumns);
|
||||
}
|
||||
|
||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists(
|
||||
const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||
unionVec.reserve(numberOfLegs);
|
||||
std::vector<std::pair<uint64_t, uint64_t>> bounds(
|
||||
{{0, 3000961}, {3000961, std::numeric_limits<uint64_t>::max()}});
|
||||
for (auto bound : bounds)
|
||||
{
|
||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||
clonedCSEP->filters(nullptr);
|
||||
// Add BETWEEN based on key column range
|
||||
clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound));
|
||||
unionVec.push_back(clonedCSEP);
|
||||
}
|
||||
// execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists(
|
||||
// const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
||||
// {
|
||||
// execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||
// unionVec.reserve(numberOfLegs);
|
||||
// std::vector<std::pair<uint64_t, uint64_t>> bounds(
|
||||
// {{0, 3000961}, {3000961, std::numeric_limits<uint64_t>::max()}});
|
||||
// for (auto bound : bounds)
|
||||
// {
|
||||
// auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||
// clonedCSEP->filters(nullptr);
|
||||
// // Add BETWEEN based on key column range
|
||||
// clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound));
|
||||
// unionVec.push_back(clonedCSEP);
|
||||
// }
|
||||
|
||||
return unionVec;
|
||||
}
|
||||
// return unionVec;
|
||||
// }
|
||||
|
||||
// TODO: remove applyParallelCES_exists
|
||||
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
||||
{
|
||||
auto tables = csep.tableList();
|
||||
execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
||||
execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList;
|
||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
||||
// // TODO: remove applyParallelCES_exists
|
||||
// void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
||||
// {
|
||||
// auto tables = csep.tableList();
|
||||
// execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
||||
// execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList;
|
||||
// execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
||||
|
||||
// ATM Must be only 1 table
|
||||
for (auto& table : tables)
|
||||
{
|
||||
if (!table.isColumnstore())
|
||||
{
|
||||
auto derivedSCEP = csep.cloneWORecursiveSelects();
|
||||
// need to add a level here
|
||||
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||
std::to_string(ctx.uniqueId);
|
||||
// // ATM Must be only 1 table
|
||||
// for (auto& table : tables)
|
||||
// {
|
||||
// if (!table.isColumnstore())
|
||||
// {
|
||||
// auto derivedSCEP = csep.cloneWORecursiveSelects();
|
||||
// // need to add a level here
|
||||
// std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||
// std::to_string(ctx.uniqueId);
|
||||
|
||||
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||
derivedSCEP->derivedTbAlias(tableAlias);
|
||||
// derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||
// derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||
// derivedSCEP->derivedTbAlias(tableAlias);
|
||||
|
||||
// TODO: hardcoded for now
|
||||
size_t parallelFactor = 2;
|
||||
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||
auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep);
|
||||
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||
additionalUnionVec.end());
|
||||
// // TODO: hardcoded for now
|
||||
// size_t parallelFactor = 2;
|
||||
// // Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||
// auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep);
|
||||
// derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||
// additionalUnionVec.end());
|
||||
|
||||
size_t colPosition = 0;
|
||||
// change parent to derived table columns
|
||||
for (auto& rc : csep.returnedCols())
|
||||
{
|
||||
auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
|
||||
// TODO timezone and result type are not copied
|
||||
// TODO add specific ctor for this functionality
|
||||
rcCloned->tableName("");
|
||||
rcCloned->schemaName("");
|
||||
rcCloned->tableAlias(tableAlias);
|
||||
rcCloned->colPosition(colPosition++);
|
||||
rcCloned->resultType(rc->resultType());
|
||||
// size_t colPosition = 0;
|
||||
// // change parent to derived table columns
|
||||
// for (auto& rc : csep.returnedCols())
|
||||
// {
|
||||
// auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
|
||||
// // TODO timezone and result type are not copied
|
||||
// // TODO add specific ctor for this functionality
|
||||
// rcCloned->tableName("");
|
||||
// rcCloned->schemaName("");
|
||||
// rcCloned->tableAlias(tableAlias);
|
||||
// rcCloned->colPosition(colPosition++);
|
||||
// rcCloned->resultType(rc->resultType());
|
||||
|
||||
newReturnedColumns.push_back(rcCloned);
|
||||
}
|
||||
// newReturnedColumns.push_back(rcCloned);
|
||||
// }
|
||||
|
||||
newDerivedTableList.push_back(derivedSCEP);
|
||||
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
||||
newTableList.push_back(tn);
|
||||
// Remove the filters as they were pushed down to union units
|
||||
// This is inappropriate for EXISTS filter and join conditions
|
||||
// TODO if needed
|
||||
derivedSCEP->filters(nullptr);
|
||||
}
|
||||
}
|
||||
// Remove the filters as they were pushed down to union units
|
||||
// This is inappropriate for EXISTS filter and join conditions
|
||||
// csep.filters(nullptr);
|
||||
// There must be no derived at this point.
|
||||
csep.derivedTableList(newDerivedTableList);
|
||||
// Replace table list with new table list populated with union units
|
||||
csep.tableList(newTableList);
|
||||
csep.returnedCols(newReturnedColumns);
|
||||
}
|
||||
// newDerivedTableList.push_back(derivedSCEP);
|
||||
// execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
||||
// newTableList.push_back(tn);
|
||||
// // Remove the filters as they were pushed down to union units
|
||||
// // This is inappropriate for EXISTS filter and join conditions
|
||||
// // TODO if needed
|
||||
// derivedSCEP->filters(nullptr);
|
||||
// }
|
||||
// }
|
||||
// // Remove the filters as they were pushed down to union units
|
||||
// // This is inappropriate for EXISTS filter and join conditions
|
||||
// // csep.filters(nullptr);
|
||||
// // There must be no derived at this point.
|
||||
// csep.derivedTableList(newDerivedTableList);
|
||||
// // Replace table list with new table list populated with union units
|
||||
// csep.tableList(newTableList);
|
||||
// csep.returnedCols(newReturnedColumns);
|
||||
// }
|
||||
|
||||
} // namespace optimizer
|
||||
|
Reference in New Issue
Block a user