You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
feat(optimizer): PoC for EI stats retrieval in getSelectPlan()
This commit is contained in:
@ -15,6 +15,13 @@
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
#include "rulebased_optimizer.h"
|
||||
|
||||
#include "constantcolumn.h"
|
||||
#include "execplan/calpontselectexecutionplan.h"
|
||||
#include "execplan/simplecolumn.h"
|
||||
@ -23,44 +30,45 @@
|
||||
#include "operator.h"
|
||||
#include "predicateoperator.h"
|
||||
#include "simplefilter.h"
|
||||
#include "rulebased_optimizer.h"
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
namespace optimizer
|
||||
{
|
||||
|
||||
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const size_t id);
|
||||
|
||||
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
|
||||
|
||||
// Apply a list of rules to a CSEP
|
||||
bool optimizeCSEPWithRules(execplan::CalpontSelectExecutionPlan& root, const std::vector<Rule>& rules)
|
||||
bool optimizeCSEPWithRules(execplan::CalpontSelectExecutionPlan& root, const std::vector<Rule>& rules,
|
||||
optimizer::RBOptimizerContext& ctx)
|
||||
{
|
||||
bool changed = false;
|
||||
for (const auto& rule : rules)
|
||||
{
|
||||
changed |= rule.apply(root);
|
||||
changed |= rule.apply(root, ctx);
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
// high level API call for optimizer
|
||||
bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root)
|
||||
bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimizerContext& ctx)
|
||||
{
|
||||
optimizer::Rule parallelCES{"parallelCES", optimizer::matchParallelCES, optimizer::applyParallelCES};
|
||||
|
||||
std::vector<Rule> rules = {parallelCES};
|
||||
std::vector<optimizer::Rule> rules = {parallelCES};
|
||||
|
||||
return optimizeCSEPWithRules(root, rules);
|
||||
return optimizeCSEPWithRules(root, rules, ctx);
|
||||
}
|
||||
|
||||
// Apply iteratively until CSEP is converged by rule
|
||||
bool Rule::apply(execplan::CalpontSelectExecutionPlan& root) const
|
||||
bool Rule::apply(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimizerContext& ctx) const
|
||||
{
|
||||
bool changedThisRound = false;
|
||||
bool hasBeenApplied = false;
|
||||
|
||||
do
|
||||
{
|
||||
changedThisRound = walk(root);
|
||||
changedThisRound = walk(root, ctx);
|
||||
hasBeenApplied |= changedThisRound;
|
||||
} while (changedThisRound && !applyOnlyOnce);
|
||||
|
||||
@ -68,69 +76,42 @@ bool Rule::apply(execplan::CalpontSelectExecutionPlan& root) const
|
||||
}
|
||||
|
||||
// DFS walk to match CSEP and apply rules if match
|
||||
bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep) const
|
||||
bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) const
|
||||
{
|
||||
bool rewrite = false;
|
||||
|
||||
for (auto& table : csep.derivedTableList())
|
||||
std::stack<execplan::CalpontSelectExecutionPlan*> planStack;
|
||||
planStack.push(&csep);
|
||||
|
||||
while (!planStack.empty())
|
||||
{
|
||||
auto* csepPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(table.get());
|
||||
if (!csepPtr)
|
||||
execplan::CalpontSelectExecutionPlan* current = planStack.top();
|
||||
planStack.pop();
|
||||
|
||||
for (auto& table : current->derivedTableList())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
auto& csepLocal = *csepPtr;
|
||||
rewrite |= walk(csepLocal);
|
||||
}
|
||||
|
||||
for (auto& unionUnit : csep.unionVec())
|
||||
{
|
||||
auto* unionUnitPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(unionUnit.get());
|
||||
if (!unionUnitPtr)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
auto& unionUnitLocal = *unionUnitPtr;
|
||||
rewrite |= walk(unionUnitLocal);
|
||||
}
|
||||
|
||||
if (csep.filters() != nullptr)
|
||||
{
|
||||
bool rewriteLocal = false;
|
||||
std::vector<execplan::ParseTree*> stack;
|
||||
stack.push_back(csep.filters());
|
||||
while (!stack.empty())
|
||||
{
|
||||
execplan::ParseTree* node = stack.back();
|
||||
stack.pop_back();
|
||||
if (node == nullptr)
|
||||
continue;
|
||||
|
||||
auto* existsFilter = dynamic_cast<execplan::ExistsFilter*>(node->data());
|
||||
if (existsFilter)
|
||||
auto* csepPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(table.get());
|
||||
if (csepPtr)
|
||||
{
|
||||
if (matchRule(*existsFilter->sub()))
|
||||
{
|
||||
applyRule(*existsFilter->sub());
|
||||
rewriteLocal = true;
|
||||
}
|
||||
planStack.push(csepPtr);
|
||||
}
|
||||
|
||||
if (node->right())
|
||||
stack.push_back(node->right());
|
||||
if (node->left())
|
||||
stack.push_back(node->left());
|
||||
}
|
||||
if (rewriteLocal)
|
||||
rewrite |= rewriteLocal;
|
||||
}
|
||||
|
||||
if (matchRule(csep))
|
||||
{
|
||||
applyRule(csep);
|
||||
rewrite = true;
|
||||
for (auto& unionUnit : current->unionVec())
|
||||
{
|
||||
auto* unionUnitPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(unionUnit.get());
|
||||
if (unionUnitPtr)
|
||||
{
|
||||
planStack.push(unionUnitPtr);
|
||||
}
|
||||
}
|
||||
|
||||
if (matchRule(*current))
|
||||
{
|
||||
applyRule(*current, ctx);
|
||||
++ctx.uniqueId;
|
||||
rewrite = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rewrite;
|
||||
@ -177,7 +158,7 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
||||
auto* filterColLeftOp = new execplan::ConstantColumnUInt(bound.second, 0, 0);
|
||||
// set TZ
|
||||
// There is a question with ownership of the const column
|
||||
// WIP here we lost upper bound value if predicate is not changed to weak lt
|
||||
// WIP here we lost upper bound value if predicate is not changed to weak lt
|
||||
execplan::SOP ltOp = boost::make_shared<execplan::Operator>(execplan::PredicateOperator("<"));
|
||||
ltOp->setOpType(filterColLeftOp->resultType(), tableKeyColumnLeftOp->resultType());
|
||||
ltOp->resultType(ltOp->operationType());
|
||||
@ -210,14 +191,42 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
||||
return ptp;
|
||||
}
|
||||
|
||||
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
return dynamic_cast<execplan::SimpleColumn*>(csep.returnedCols().front().get());
|
||||
}
|
||||
|
||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||
const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
||||
execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||
{
|
||||
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||
unionVec.reserve(numberOfLegs);
|
||||
std::vector<std::pair<uint64_t, uint64_t>> bounds({{0, 3000961},
|
||||
{3000961, std::numeric_limits<uint64_t>::max()}});
|
||||
for (auto bound : bounds)
|
||||
// unionVec.reserve(numberOfLegs);
|
||||
execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep);
|
||||
std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.columnStatisticsMap " << " with size " << ctx.gwi.columnStatisticsMap.size() << std::endl;
|
||||
for (auto& [k, v] : ctx.gwi.columnStatisticsMap)
|
||||
{
|
||||
std::cout << "key " << k << std::endl;
|
||||
}
|
||||
if (!keyColumn ||
|
||||
ctx.gwi.columnStatisticsMap.find(keyColumn->columnName()) == ctx.gwi.columnStatisticsMap.end())
|
||||
{
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
auto columnStatistics = ctx.gwi.columnStatisticsMap[keyColumn->columnName()];
|
||||
std::cout << "columnStatistics.size() " << columnStatistics.size() << std::endl;
|
||||
// TODO char and other numerical types support
|
||||
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
||||
std::transform(columnStatistics.begin(), columnStatistics.end(), std::back_inserter(bounds),
|
||||
[](const auto& bucket)
|
||||
{
|
||||
uint64_t lowerBound = std::stoul(bucket.start_value);
|
||||
uint64_t upperBound = lowerBound + bucket.ndv;
|
||||
return std::make_pair(lowerBound, upperBound);
|
||||
});
|
||||
// std::vector<std::pair<uint64_t, uint64_t>> bounds({{0, 3000961},
|
||||
// // {3000961, std::numeric_limits<uint64_t>::max()}});
|
||||
for (auto& bound : bounds)
|
||||
{
|
||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||
// Add BETWEEN based on key column range
|
||||
@ -227,8 +236,7 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
||||
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
||||
{
|
||||
auto tables = csep.tableList();
|
||||
execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
||||
@ -242,16 +250,17 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
auto derivedSCEP = csep.cloneWORecursiveSelects();
|
||||
// need to add a level here
|
||||
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table;
|
||||
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||
std::to_string(ctx.uniqueId);
|
||||
|
||||
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||
derivedSCEP->derivedTbAlias(tableAlias);
|
||||
|
||||
// TODO: hardcoded for now
|
||||
size_t parallelFactor = 2;
|
||||
// size_t parallelFactor = 2;
|
||||
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||
auto additionalUnionVec = makeUnionFromTable(parallelFactor, csep);
|
||||
auto additionalUnionVec = makeUnionFromTable(csep, ctx);
|
||||
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||
additionalUnionVec.end());
|
||||
|
||||
@ -275,11 +284,95 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
||||
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
||||
newTableList.push_back(tn);
|
||||
// Remove the filters as they were pushed down to union units
|
||||
// This is inappropriate for EXISTS filter and join conditions
|
||||
derivedSCEP->filters(nullptr);
|
||||
}
|
||||
}
|
||||
// Remove the filters as they were pushed down to union units
|
||||
csep.filters(nullptr);
|
||||
// This is inappropriate for EXISTS filter and join conditions
|
||||
// csep.filters(nullptr);
|
||||
// There must be no derived at this point.
|
||||
csep.derivedTableList(newDerivedTableList);
|
||||
// Replace table list with new table list populated with union units
|
||||
csep.tableList(newTableList);
|
||||
csep.returnedCols(newReturnedColumns);
|
||||
}
|
||||
|
||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists(
|
||||
const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||
unionVec.reserve(numberOfLegs);
|
||||
std::vector<std::pair<uint64_t, uint64_t>> bounds(
|
||||
{{0, 3000961}, {3000961, std::numeric_limits<uint64_t>::max()}});
|
||||
for (auto bound : bounds)
|
||||
{
|
||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||
clonedCSEP->filters(nullptr);
|
||||
// Add BETWEEN based on key column range
|
||||
clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound));
|
||||
unionVec.push_back(clonedCSEP);
|
||||
}
|
||||
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
||||
{
|
||||
auto tables = csep.tableList();
|
||||
execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
||||
execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList;
|
||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
||||
|
||||
// ATM Must be only 1 table
|
||||
for (auto& table : tables)
|
||||
{
|
||||
if (!table.isColumnstore())
|
||||
{
|
||||
auto derivedSCEP = csep.cloneWORecursiveSelects();
|
||||
// need to add a level here
|
||||
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||
std::to_string(ctx.uniqueId);
|
||||
|
||||
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||
derivedSCEP->derivedTbAlias(tableAlias);
|
||||
|
||||
// TODO: hardcoded for now
|
||||
size_t parallelFactor = 2;
|
||||
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||
auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep);
|
||||
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||
additionalUnionVec.end());
|
||||
|
||||
size_t colPosition = 0;
|
||||
// change parent to derived table columns
|
||||
for (auto& rc : csep.returnedCols())
|
||||
{
|
||||
auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
|
||||
// TODO timezone and result type are not copied
|
||||
// TODO add specific ctor for this functionality
|
||||
rcCloned->tableName("");
|
||||
rcCloned->schemaName("");
|
||||
rcCloned->tableAlias(tableAlias);
|
||||
rcCloned->colPosition(colPosition++);
|
||||
rcCloned->resultType(rc->resultType());
|
||||
|
||||
newReturnedColumns.push_back(rcCloned);
|
||||
}
|
||||
|
||||
newDerivedTableList.push_back(derivedSCEP);
|
||||
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
||||
newTableList.push_back(tn);
|
||||
// Remove the filters as they were pushed down to union units
|
||||
// This is inappropriate for EXISTS filter and join conditions
|
||||
// TODO if needed
|
||||
derivedSCEP->filters(nullptr);
|
||||
}
|
||||
}
|
||||
// Remove the filters as they were pushed down to union units
|
||||
// This is inappropriate for EXISTS filter and join conditions
|
||||
// csep.filters(nullptr);
|
||||
// There must be no derived at this point.
|
||||
csep.derivedTableList(newDerivedTableList);
|
||||
// Replace table list with new table list populated with union units
|
||||
|
Reference in New Issue
Block a user