You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
feat(optimizer): first cut for rewrite foreign table into UNION rule
This commit is contained in:
@ -205,7 +205,7 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const
|
||||
{
|
||||
ostringstream output;
|
||||
|
||||
output << std::string(ident, ' ') << "SELECT ";
|
||||
output << "SELECT ";
|
||||
|
||||
if (distinct())
|
||||
{
|
||||
@ -340,14 +340,16 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const
|
||||
output << "QueryType: " << queryType() << endlWithIndent(ident);
|
||||
|
||||
if (!unionVec().empty())
|
||||
output << "\n--- Union Unit ---" << endlWithIndent(ident);
|
||||
{
|
||||
output << "--- Union Unit ---";
|
||||
}
|
||||
|
||||
for (unsigned i = 0; i < unionVec().size(); i++)
|
||||
{
|
||||
CalpontSelectExecutionPlan* plan = dynamic_cast<CalpontSelectExecutionPlan*>(unionVec()[i].get());
|
||||
|
||||
if (plan)
|
||||
output << "{" << *plan << "}\n" << endlWithIndent(ident);
|
||||
output << endlWithIndent(ident) << "{" << plan->toString(ident + 2) << "}\n" << endlWithIndent(ident);
|
||||
}
|
||||
|
||||
return output.str();
|
||||
@ -858,4 +860,151 @@ void CalpontSelectExecutionPlan::pron(std::string&& pron)
|
||||
fPron = pron;
|
||||
}
|
||||
|
||||
execplan::SCSEP CalpontSelectExecutionPlan::cloneWORecursiveSelects()
|
||||
{
|
||||
execplan::SCSEP newPlan(new CalpontSelectExecutionPlan(fLocation));
|
||||
|
||||
// Copy simple members
|
||||
newPlan->fLocalQuery = fLocalQuery;
|
||||
newPlan->fTableAlias = fTableAlias;
|
||||
newPlan->fLocation = fLocation;
|
||||
newPlan->fDependent = fDependent;
|
||||
newPlan->fData = fData;
|
||||
newPlan->fSessionID = fSessionID;
|
||||
newPlan->fTxnID = fTxnID;
|
||||
newPlan->fVerID = fVerID;
|
||||
newPlan->fSchemaName = fSchemaName;
|
||||
newPlan->fTableName = fTableName;
|
||||
newPlan->fTraceFlags = fTraceFlags;
|
||||
newPlan->fStatementID = fStatementID;
|
||||
newPlan->fDistinct = fDistinct;
|
||||
newPlan->fOverrideLargeSideEstimate = fOverrideLargeSideEstimate;
|
||||
newPlan->fDistinctUnionNum = fDistinctUnionNum;
|
||||
newPlan->fSubType = fSubType;
|
||||
newPlan->fDerivedTbAlias = fDerivedTbAlias;
|
||||
newPlan->fDerivedTbView = fDerivedTbView;
|
||||
newPlan->fLimitStart = fLimitStart;
|
||||
newPlan->fLimitNum = fLimitNum;
|
||||
newPlan->fHasOrderBy = fHasOrderBy;
|
||||
newPlan->fStringScanThreshold = fStringScanThreshold;
|
||||
newPlan->fQueryType = fQueryType;
|
||||
newPlan->fPriority = fPriority;
|
||||
newPlan->fStringTableThreshold = fStringTableThreshold;
|
||||
newPlan->fSpecHandlerProcessed = fSpecHandlerProcessed;
|
||||
newPlan->fOrderByThreads = fOrderByThreads;
|
||||
newPlan->fUuid = fUuid;
|
||||
newPlan->fDJSSmallSideLimit = fDJSSmallSideLimit;
|
||||
newPlan->fDJSLargeSideLimit = fDJSLargeSideLimit;
|
||||
newPlan->fDJSPartitionSize = fDJSPartitionSize;
|
||||
newPlan->fDJSMaxPartitionTreeDepth = fDJSMaxPartitionTreeDepth;
|
||||
newPlan->fDJSForceRun = fDJSForceRun;
|
||||
newPlan->fMaxPmJoinResultCount = fMaxPmJoinResultCount;
|
||||
newPlan->fUMMemLimit = fUMMemLimit;
|
||||
newPlan->fIsDML = fIsDML;
|
||||
newPlan->fTimeZone = fTimeZone;
|
||||
newPlan->fPron = fPron;
|
||||
newPlan->fWithRollup = fWithRollup;
|
||||
|
||||
// Deep copy of ReturnedColumnList
|
||||
ReturnedColumnList newReturnedCols;
|
||||
for (const auto& col : fReturnedCols)
|
||||
{
|
||||
if (col)
|
||||
newReturnedCols.push_back(SRCP(col->clone()));
|
||||
}
|
||||
newPlan->returnedCols(newReturnedCols);
|
||||
|
||||
// Deep copy of filters
|
||||
if (fFilters)
|
||||
newPlan->filters(new ParseTree(*fFilters));
|
||||
|
||||
// Deep copy of filter token list
|
||||
newPlan->filterTokenList(fFilterTokenList);
|
||||
newPlan->havingTokenList(fHavingTokenList);
|
||||
|
||||
// Deep copy of subselects
|
||||
// SelectList newSubSelects;
|
||||
// for (const auto& sel : fSubSelects)
|
||||
// {
|
||||
// if (sel)
|
||||
// newSubSelects.push_back(SCEP(sel->clone()));
|
||||
// }
|
||||
// newPlan->subSelects(newSubSelects);
|
||||
|
||||
// Deep copy of group by columns
|
||||
GroupByColumnList newGroupByCols;
|
||||
for (const auto& col : fGroupByCols)
|
||||
{
|
||||
if (col)
|
||||
newGroupByCols.push_back(SRCP(col->clone()));
|
||||
}
|
||||
newPlan->groupByCols(newGroupByCols);
|
||||
|
||||
// Deep copy of having clause
|
||||
if (fHaving)
|
||||
newPlan->having(new ParseTree(*fHaving));
|
||||
|
||||
// Deep copy of order by columns
|
||||
OrderByColumnList newOrderByCols;
|
||||
for (const auto& col : fOrderByCols)
|
||||
{
|
||||
if (col)
|
||||
newOrderByCols.push_back(SRCP(col->clone()));
|
||||
}
|
||||
newPlan->orderByCols(newOrderByCols);
|
||||
|
||||
// Deep copy of column map
|
||||
ColumnMap newColumnMap;
|
||||
for (const auto& entry : fColumnMap)
|
||||
{
|
||||
if (entry.second)
|
||||
newColumnMap.insert(ColumnMap::value_type(entry.first, SRCP(entry.second->clone())));
|
||||
}
|
||||
newPlan->columnMap(newColumnMap);
|
||||
|
||||
// Copy RM parameters
|
||||
newPlan->rmParms(frmParms);
|
||||
|
||||
// Deep copy of table list
|
||||
newPlan->tableList(fTableList);
|
||||
|
||||
// // Deep copy of derived table list
|
||||
// SelectList newDerivedTableList;
|
||||
// for (const auto& sel : fDerivedTableList)
|
||||
// {
|
||||
// if (sel)
|
||||
// newDerivedTableList.push_back(SCEP(sel->clone()));
|
||||
// }
|
||||
// newPlan->derivedTableList(newDerivedTableList);
|
||||
|
||||
// // Deep copy of union vector
|
||||
// SelectList newUnionVec;
|
||||
// for (const auto& sel : fUnionVec)
|
||||
// {
|
||||
// if (sel)
|
||||
// newUnionVec.push_back(SCEP(sel->clone()));
|
||||
// }
|
||||
// newPlan->unionVec(newUnionVec);
|
||||
|
||||
// // Deep copy of select subqueries
|
||||
// SelectList newSelectSubList;
|
||||
// for (const auto& sel : fSelectSubList)
|
||||
// {
|
||||
// if (sel)
|
||||
// newSelectSubList.push_back(SCEP(sel->clone()));
|
||||
// }
|
||||
// newPlan->selectSubList(newSelectSubList);
|
||||
|
||||
// // Deep copy of subquery list
|
||||
// std::vector<SCSEP> newSubSelectList;
|
||||
// for (const auto& sel : fSubSelectList)
|
||||
// {
|
||||
// if (sel)
|
||||
// newSubSelectList.push_back(SCSEP(sel->clone()));
|
||||
// }
|
||||
// newPlan->subSelectList(newSubSelectList);
|
||||
|
||||
return newPlan;
|
||||
}
|
||||
|
||||
} // namespace execplan
|
||||
|
@ -159,6 +159,7 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
|
||||
*/
|
||||
~CalpontSelectExecutionPlan() override;
|
||||
|
||||
execplan::SCSEP cloneWORecursiveSelects();
|
||||
/**
|
||||
* Access and mutator methods
|
||||
*/
|
||||
|
@ -9206,14 +9206,26 @@ int cs_get_derived_plan(ha_columnstore_derived_handler* handler, THD* /*thd*/, S
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool tableIsInUnion(const execplan::CalpontSystemCatalog::TableAliasName& table, CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
return std::any_of(csep.unionVec().begin(), csep.unionVec().end(),
|
||||
[&table](const auto& unionUnit) {
|
||||
auto unionUnitLocal = *dynamic_cast<execplan::CalpontSelectExecutionPlan*>(unionUnit.get());
|
||||
bool tableIsPresented = std::any_of(unionUnitLocal.tableList().begin(), unionUnitLocal.tableList().end(),
|
||||
[&table](const auto& unionTable) {
|
||||
return unionTable == table;
|
||||
});
|
||||
return tableIsPresented;
|
||||
});
|
||||
}
|
||||
|
||||
bool matchParallelCES(CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
std::cout << csep.toString() << std::endl;
|
||||
auto tables = csep.tableList();
|
||||
// This is leaf and there are no other tables at this level.
|
||||
for (auto& table : tables)
|
||||
{
|
||||
if (!table.isColumnstore())
|
||||
if (!table.isColumnstore() && !tableIsInUnion(table, csep))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
@ -9222,28 +9234,37 @@ bool matchParallelCES(CalpontSelectExecutionPlan& csep)
|
||||
return false;
|
||||
}
|
||||
|
||||
// CalpontSelectExecutionPlan tableIntoUnion(CalpontSelectExecutionPlan& table, CalpontSelectExecutionPlan& csep)
|
||||
// {
|
||||
// auto* unionCSEP = {new CalpontSelectExecutionPlan()};
|
||||
// CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
|
||||
// CalpontSelectExecutionPlan::ColumnMap colMap;
|
||||
CalpontSelectExecutionPlan::SelectList makeUnionFromTable(const size_t numberOfLegs,
|
||||
CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
CalpontSelectExecutionPlan::SelectList unionVec;
|
||||
unionVec.reserve(numberOfLegs);
|
||||
for (size_t i = 0; i < numberOfLegs; ++i)
|
||||
{
|
||||
unionVec.emplace_back(csep.cloneWORecursiveSelects());
|
||||
}
|
||||
|
||||
// unionSCEP.unionVec({csep});
|
||||
// return unionSCEP;
|
||||
// }
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
void applyParallelCES(CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
std::cout << "applyParallelCES" << std::endl;
|
||||
std::cout << "original unionVec size " << csep.unionVec().size() << std::endl;
|
||||
|
||||
auto tables = csep.tableList();
|
||||
for (auto it = tables.begin(); it != tables.end(); ++it)
|
||||
{
|
||||
if (!it->isColumnstore())
|
||||
{
|
||||
// auto unionSCEP = tableIntoUnion(*it, csep);
|
||||
// tables.erase(it);
|
||||
// csep.unionVec().push_back(unionSCEP);
|
||||
size_t parallelFactor = 2;
|
||||
auto additionalUnionVec = makeUnionFromTable(parallelFactor, csep);
|
||||
csep.unionVec().insert(csep.unionVec().end(), additionalUnionVec.begin(), additionalUnionVec.end());
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "modified CSEP" << std::endl;
|
||||
std::cout << "unionVec size " << csep.unionVec().size() << std::endl;
|
||||
}
|
||||
|
||||
struct Rule
|
||||
@ -9257,13 +9278,18 @@ struct Rule
|
||||
void (*apply)(CalpontSelectExecutionPlan&);
|
||||
bool walk(CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
bool rewrite = false;
|
||||
for (auto& table : csep.derivedTableList())
|
||||
{
|
||||
auto csepLocal = *dynamic_cast<execplan::CalpontSelectExecutionPlan*>(table.get());
|
||||
if (match(csepLocal))
|
||||
{
|
||||
apply(csepLocal);
|
||||
return true;
|
||||
rewrite = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
rewrite |= walk(csepLocal);
|
||||
}
|
||||
}
|
||||
|
||||
@ -9274,17 +9300,21 @@ struct Rule
|
||||
if (match(unionUnitLocal))
|
||||
{
|
||||
apply(unionUnitLocal);
|
||||
return true;
|
||||
rewrite = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
rewrite |= walk(unionUnitLocal);
|
||||
}
|
||||
}
|
||||
|
||||
if (match(csep))
|
||||
{
|
||||
apply(csep);
|
||||
return true;
|
||||
rewrite = true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return rewrite;
|
||||
}
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user