From e73e5834abfed7676802845988f6df8f412a1cc2 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Mon, 9 Jun 2025 18:51:52 +0000 Subject: [PATCH] feat(optimizer): first cut for rewrite foreign table into UNION rule --- dbcon/execplan/calpontselectexecutionplan.cpp | 155 +++++++++++++++++- dbcon/execplan/calpontselectexecutionplan.h | 1 + dbcon/mysql/ha_mcs_execplan.cpp | 64 ++++++-- 3 files changed, 200 insertions(+), 20 deletions(-) diff --git a/dbcon/execplan/calpontselectexecutionplan.cpp b/dbcon/execplan/calpontselectexecutionplan.cpp index 3265835d6..feefab04d 100644 --- a/dbcon/execplan/calpontselectexecutionplan.cpp +++ b/dbcon/execplan/calpontselectexecutionplan.cpp @@ -205,7 +205,7 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const { ostringstream output; - output << std::string(ident, ' ') << "SELECT "; + output << "SELECT "; if (distinct()) { @@ -340,14 +340,16 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const output << "QueryType: " << queryType() << endlWithIndent(ident); if (!unionVec().empty()) - output << "\n--- Union Unit ---" << endlWithIndent(ident); + { + output << "--- Union Unit ---"; + } for (unsigned i = 0; i < unionVec().size(); i++) { CalpontSelectExecutionPlan* plan = dynamic_cast(unionVec()[i].get()); if (plan) - output << "{" << *plan << "}\n" << endlWithIndent(ident); + output << endlWithIndent(ident) << "{" << plan->toString(ident + 2) << "}\n" << endlWithIndent(ident); } return output.str(); @@ -858,4 +860,151 @@ void CalpontSelectExecutionPlan::pron(std::string&& pron) fPron = pron; } +execplan::SCSEP CalpontSelectExecutionPlan::cloneWORecursiveSelects() +{ + execplan::SCSEP newPlan(new CalpontSelectExecutionPlan(fLocation)); + + // Copy simple members + newPlan->fLocalQuery = fLocalQuery; + newPlan->fTableAlias = fTableAlias; + newPlan->fLocation = fLocation; + newPlan->fDependent = fDependent; + newPlan->fData = fData; + newPlan->fSessionID = fSessionID; + newPlan->fTxnID = fTxnID; + newPlan->fVerID = fVerID; + newPlan->fSchemaName = fSchemaName; + newPlan->fTableName = fTableName; + newPlan->fTraceFlags = fTraceFlags; + newPlan->fStatementID = fStatementID; + newPlan->fDistinct = fDistinct; + newPlan->fOverrideLargeSideEstimate = fOverrideLargeSideEstimate; + newPlan->fDistinctUnionNum = fDistinctUnionNum; + newPlan->fSubType = fSubType; + newPlan->fDerivedTbAlias = fDerivedTbAlias; + newPlan->fDerivedTbView = fDerivedTbView; + newPlan->fLimitStart = fLimitStart; + newPlan->fLimitNum = fLimitNum; + newPlan->fHasOrderBy = fHasOrderBy; + newPlan->fStringScanThreshold = fStringScanThreshold; + newPlan->fQueryType = fQueryType; + newPlan->fPriority = fPriority; + newPlan->fStringTableThreshold = fStringTableThreshold; + newPlan->fSpecHandlerProcessed = fSpecHandlerProcessed; + newPlan->fOrderByThreads = fOrderByThreads; + newPlan->fUuid = fUuid; + newPlan->fDJSSmallSideLimit = fDJSSmallSideLimit; + newPlan->fDJSLargeSideLimit = fDJSLargeSideLimit; + newPlan->fDJSPartitionSize = fDJSPartitionSize; + newPlan->fDJSMaxPartitionTreeDepth = fDJSMaxPartitionTreeDepth; + newPlan->fDJSForceRun = fDJSForceRun; + newPlan->fMaxPmJoinResultCount = fMaxPmJoinResultCount; + newPlan->fUMMemLimit = fUMMemLimit; + newPlan->fIsDML = fIsDML; + newPlan->fTimeZone = fTimeZone; + newPlan->fPron = fPron; + newPlan->fWithRollup = fWithRollup; + + // Deep copy of ReturnedColumnList + ReturnedColumnList newReturnedCols; + for (const auto& col : fReturnedCols) + { + if (col) + newReturnedCols.push_back(SRCP(col->clone())); + } + newPlan->returnedCols(newReturnedCols); + + // Deep copy of filters + if (fFilters) + newPlan->filters(new ParseTree(*fFilters)); + + // Deep copy of filter token list + newPlan->filterTokenList(fFilterTokenList); + newPlan->havingTokenList(fHavingTokenList); + + // Deep copy of subselects + // SelectList newSubSelects; + // for (const auto& sel : fSubSelects) + // { + // if (sel) + // newSubSelects.push_back(SCEP(sel->clone())); + // } + // newPlan->subSelects(newSubSelects); + + // Deep copy of group by columns + GroupByColumnList newGroupByCols; + for (const auto& col : fGroupByCols) + { + if (col) + newGroupByCols.push_back(SRCP(col->clone())); + } + newPlan->groupByCols(newGroupByCols); + + // Deep copy of having clause + if (fHaving) + newPlan->having(new ParseTree(*fHaving)); + + // Deep copy of order by columns + OrderByColumnList newOrderByCols; + for (const auto& col : fOrderByCols) + { + if (col) + newOrderByCols.push_back(SRCP(col->clone())); + } + newPlan->orderByCols(newOrderByCols); + + // Deep copy of column map + ColumnMap newColumnMap; + for (const auto& entry : fColumnMap) + { + if (entry.second) + newColumnMap.insert(ColumnMap::value_type(entry.first, SRCP(entry.second->clone()))); + } + newPlan->columnMap(newColumnMap); + + // Copy RM parameters + newPlan->rmParms(frmParms); + + // Deep copy of table list + newPlan->tableList(fTableList); + + // // Deep copy of derived table list + // SelectList newDerivedTableList; + // for (const auto& sel : fDerivedTableList) + // { + // if (sel) + // newDerivedTableList.push_back(SCEP(sel->clone())); + // } + // newPlan->derivedTableList(newDerivedTableList); + + // // Deep copy of union vector + // SelectList newUnionVec; + // for (const auto& sel : fUnionVec) + // { + // if (sel) + // newUnionVec.push_back(SCEP(sel->clone())); + // } + // newPlan->unionVec(newUnionVec); + + // // Deep copy of select subqueries + // SelectList newSelectSubList; + // for (const auto& sel : fSelectSubList) + // { + // if (sel) + // newSelectSubList.push_back(SCEP(sel->clone())); + // } + // newPlan->selectSubList(newSelectSubList); + + // // Deep copy of subquery list + // std::vector newSubSelectList; + // for (const auto& sel : fSubSelectList) + // { + // if (sel) + // newSubSelectList.push_back(SCSEP(sel->clone())); + // } + // newPlan->subSelectList(newSubSelectList); + + return newPlan; +} + } // namespace execplan diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index 9dfa4afcd..a84a3b4bb 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -159,6 +159,7 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan */ ~CalpontSelectExecutionPlan() override; + execplan::SCSEP cloneWORecursiveSelects(); /** * Access and mutator methods */ diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index cb2323228..d42c1a23f 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -9206,14 +9206,26 @@ int cs_get_derived_plan(ha_columnstore_derived_handler* handler, THD* /*thd*/, S return 0; } +bool tableIsInUnion(const execplan::CalpontSystemCatalog::TableAliasName& table, CalpontSelectExecutionPlan& csep) +{ + return std::any_of(csep.unionVec().begin(), csep.unionVec().end(), + [&table](const auto& unionUnit) { + auto unionUnitLocal = *dynamic_cast(unionUnit.get()); + bool tableIsPresented = std::any_of(unionUnitLocal.tableList().begin(), unionUnitLocal.tableList().end(), + [&table](const auto& unionTable) { + return unionTable == table; + }); + return tableIsPresented; + }); +} + bool matchParallelCES(CalpontSelectExecutionPlan& csep) { - std::cout << csep.toString() << std::endl; auto tables = csep.tableList(); // This is leaf and there are no other tables at this level. for (auto& table : tables) { - if (!table.isColumnstore()) + if (!table.isColumnstore() && !tableIsInUnion(table, csep)) { return true; } @@ -9222,28 +9234,37 @@ bool matchParallelCES(CalpontSelectExecutionPlan& csep) return false; } -// CalpontSelectExecutionPlan tableIntoUnion(CalpontSelectExecutionPlan& table, CalpontSelectExecutionPlan& csep) -// { -// auto* unionCSEP = {new CalpontSelectExecutionPlan()}; -// CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList; -// CalpontSelectExecutionPlan::ColumnMap colMap; +CalpontSelectExecutionPlan::SelectList makeUnionFromTable(const size_t numberOfLegs, + CalpontSelectExecutionPlan& csep) +{ + CalpontSelectExecutionPlan::SelectList unionVec; + unionVec.reserve(numberOfLegs); + for (size_t i = 0; i < numberOfLegs; ++i) + { + unionVec.emplace_back(csep.cloneWORecursiveSelects()); + } -// unionSCEP.unionVec({csep}); -// return unionSCEP; -// } + return unionVec; +} void applyParallelCES(CalpontSelectExecutionPlan& csep) { + std::cout << "applyParallelCES" << std::endl; + std::cout << "original unionVec size " << csep.unionVec().size() << std::endl; + auto tables = csep.tableList(); for (auto it = tables.begin(); it != tables.end(); ++it) { if (!it->isColumnstore()) { - // auto unionSCEP = tableIntoUnion(*it, csep); - // tables.erase(it); - // csep.unionVec().push_back(unionSCEP); + size_t parallelFactor = 2; + auto additionalUnionVec = makeUnionFromTable(parallelFactor, csep); + csep.unionVec().insert(csep.unionVec().end(), additionalUnionVec.begin(), additionalUnionVec.end()); } } + + std::cout << "modified CSEP" << std::endl; + std::cout << "unionVec size " << csep.unionVec().size() << std::endl; } struct Rule @@ -9257,13 +9278,18 @@ struct Rule void (*apply)(CalpontSelectExecutionPlan&); bool walk(CalpontSelectExecutionPlan& csep) { + bool rewrite = false; for (auto& table : csep.derivedTableList()) { auto csepLocal = *dynamic_cast(table.get()); if (match(csepLocal)) { apply(csepLocal); - return true; + rewrite = true; + } + else + { + rewrite |= walk(csepLocal); } } @@ -9274,17 +9300,21 @@ struct Rule if (match(unionUnitLocal)) { apply(unionUnitLocal); - return true; + rewrite = true; + } + else + { + rewrite |= walk(unionUnitLocal); } } if (match(csep)) { apply(csep); - return true; + rewrite = true; } - return false; + return rewrite; } };