1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-10-31 18:30:33 +03:00

feat(rbo,rule,QA): fixed JOIN example works with QA for one leg of the JOIN.

This commit is contained in:
drrtuy
2025-07-23 17:55:19 +00:00
parent a1ba7932cd
commit 2f9fec8057
8 changed files with 259 additions and 113 deletions

View File

@@ -202,7 +202,7 @@ std::string endlWithIndent(const size_t ident)
}
void CalpontSelectExecutionPlan::printSubCSEP(const size_t& ident, ostringstream& output,
CalpontSelectExecutionPlan*& plan) const
CalpontSelectExecutionPlan*& plan) const
{
if (plan)
{
@@ -243,7 +243,7 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const
for (unsigned int i = 0; i < retCols.size(); i++)
{
output << endlWithIndent(ident+2) << *retCols[i]; // WIP replace with constant
output << endlWithIndent(ident + 2) << *retCols[i]; // WIP replace with constant
if (retCols[i]->colSource() & SELECT_SUB)
{
@@ -257,7 +257,7 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const
// From Clause
CalpontSelectExecutionPlan::TableList tables = tableList();
output << endlWithIndent(ident) <<">>From Tables";
output << endlWithIndent(ident) << ">>From Tables";
seq = 0;
for (unsigned int i = 0; i < tables.size(); i++)
@@ -265,7 +265,7 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const
// derived table
if (tables[i].schema.length() == 0 && tables[i].table.length() == 0)
{
output << endlWithIndent(ident+2) << "derived table - " << tables[i].alias;
output << endlWithIndent(ident + 2) << "derived table - " << tables[i].alias;
CalpontSelectExecutionPlan* plan =
dynamic_cast<CalpontSelectExecutionPlan*>(fDerivedTableList[seq++].get());
@@ -273,7 +273,7 @@ string CalpontSelectExecutionPlan::toString(const size_t ident) const
}
else
{
output << endlWithIndent(ident+2) << tables[i];
output << endlWithIndent(ident + 2) << tables[i];
}
}
@@ -863,7 +863,8 @@ void CalpontSelectExecutionPlan::pron(std::string&& pron)
fPron = pron;
}
// This routine doesn't copy derived table list, union vector, select subqueries, subquery list, and subselects.
// This routine doesn't copy derived table list, union vector, select subqueries, subquery list, and
// subselects.
execplan::SCSEP CalpontSelectExecutionPlan::cloneWORecursiveSelects()
{
execplan::SCSEP newPlan(new CalpontSelectExecutionPlan(fLocation));
@@ -966,4 +967,122 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneWORecursiveSelects()
return newPlan;
}
execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects(
const execplan::CalpontSystemCatalog::TableAliasName& targetTableAlias)
{
execplan::SCSEP newPlan(new CalpontSelectExecutionPlan(fLocation));
// Copy simple members
newPlan->fLocalQuery = fLocalQuery;
newPlan->fTableAlias = fTableAlias;
newPlan->fLocation = fLocation;
newPlan->fDependent = fDependent;
newPlan->fData = fData;
newPlan->fSessionID = fSessionID;
newPlan->fTxnID = fTxnID;
newPlan->fVerID = fVerID;
newPlan->fSchemaName = fSchemaName;
newPlan->fTableName = fTableName;
newPlan->fTraceFlags = fTraceFlags;
newPlan->fStatementID = fStatementID;
newPlan->fDistinct = fDistinct;
newPlan->fOverrideLargeSideEstimate = fOverrideLargeSideEstimate;
newPlan->fDistinctUnionNum = fDistinctUnionNum;
newPlan->fSubType = fSubType;
newPlan->fDerivedTbAlias = fDerivedTbAlias;
newPlan->fDerivedTbView = fDerivedTbView;
newPlan->fLimitStart = fLimitStart;
newPlan->fLimitNum = fLimitNum;
newPlan->fHasOrderBy = fHasOrderBy;
newPlan->fStringScanThreshold = fStringScanThreshold;
newPlan->fQueryType = fQueryType;
newPlan->fPriority = fPriority;
newPlan->fStringTableThreshold = fStringTableThreshold;
newPlan->fSpecHandlerProcessed = fSpecHandlerProcessed;
newPlan->fOrderByThreads = fOrderByThreads;
newPlan->fUuid = fUuid;
newPlan->fDJSSmallSideLimit = fDJSSmallSideLimit;
newPlan->fDJSLargeSideLimit = fDJSLargeSideLimit;
newPlan->fDJSPartitionSize = fDJSPartitionSize;
newPlan->fDJSMaxPartitionTreeDepth = fDJSMaxPartitionTreeDepth;
newPlan->fDJSForceRun = fDJSForceRun;
newPlan->fMaxPmJoinResultCount = fMaxPmJoinResultCount;
newPlan->fUMMemLimit = fUMMemLimit;
newPlan->fIsDML = fIsDML;
newPlan->fTimeZone = fTimeZone;
newPlan->fPron = fPron;
newPlan->fWithRollup = fWithRollup;
// Deep copy of ReturnedColumnList
ReturnedColumnList newReturnedCols;
for (const auto& rc : fReturnedCols)
{
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
if (simpleColumn)
{
execplan::CalpontSystemCatalog::TableAliasName rcTable(
simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", false);
if (!targetTableAlias.weakerEq(rcTable))
{
continue;
}
newReturnedCols.push_back(SRCP(rc->clone()));
}
}
newPlan->returnedCols(newReturnedCols);
// Deep copy of filters
// WIP only filters that apply to the target table must be left intact
// replace all irrelevant branches with true
if (fFilters)
{
newPlan->filters(new ParseTree(*fFilters));
}
// Deep copy of filter token list
newPlan->filterTokenList(fFilterTokenList);
newPlan->havingTokenList(fHavingTokenList);
// Deep copy of group by columns
GroupByColumnList newGroupByCols;
for (const auto& col : fGroupByCols)
{
newGroupByCols.push_back(SRCP(col->clone()));
}
newPlan->groupByCols(newGroupByCols);
// Deep copy of having clause
if (fHaving)
newPlan->having(new ParseTree(*fHaving));
// Deep copy of order by columns
OrderByColumnList newOrderByCols;
for (const auto& col : fOrderByCols)
{
newOrderByCols.push_back(SRCP(col->clone()));
}
newPlan->orderByCols(newOrderByCols);
// Deep copy of column map
ColumnMap newColumnMap;
for (const auto& entry : fColumnMap)
{
// WIP only relevant RCs must be copied
if (entry.second)
newColumnMap.insert({entry.first, SRCP(entry.second->clone())});
}
newPlan->columnMap(newColumnMap);
// Copy RM parameters
newPlan->rmParms(frmParms);
// Deep copy of table list
// INV the target table must be contained in the original TableList in this CSEP
TableList newTableList{targetTableAlias};
newPlan->tableList(newTableList);
return newPlan;
}
} // namespace execplan

View File

@@ -163,6 +163,9 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
* Clones this CSEP without recursive selects for optimizer purposes
*/
execplan::SCSEP cloneWORecursiveSelects();
execplan::SCSEP cloneForTableWORecursiveSelects(const execplan::CalpontSystemCatalog::TableAliasName& targetTableAlias);
/**
* Access and mutator methods
*/

View File

@@ -437,7 +437,8 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
: schema(sch), table(tb), alias(al), view(v), fisColumnStore(true)
{
}
TableAliasName(const std::string& sch, const std::string& tb, const std::string& al, const std::string& v, const bool isColumnStore)
TableAliasName(const std::string& sch, const std::string& tb, const std::string& al, const std::string& v,
const bool isColumnStore)
: schema(sch), table(tb), alias(al), view(v), fisColumnStore(isColumnStore)
{
}
@@ -458,6 +459,11 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
return (schema == rhs.schema && table == rhs.table && alias == rhs.alias && view == rhs.view &&
partitions == rhs.partitions && fisColumnStore == rhs.fisColumnStore);
}
bool weakerEq(const TableAliasName& rhs) const
{
return (schema == rhs.schema && table == rhs.table && alias == rhs.alias && view == rhs.view &&
fisColumnStore == rhs.fisColumnStore);
}
bool operator!=(const TableAliasName& rhs) const
{
return !(*this == rhs);

View File

@@ -139,7 +139,7 @@ extern bool nonConstFunc(Item_func* ifp);
void gp_walk_info::mergeTableStatistics(const TableStatisticsMap& aTableStatisticsMap)
{
for (auto& [schemaAndTableName, aColumnStatisticsMap]: aTableStatisticsMap)
for (auto& [schemaAndTableName, aColumnStatisticsMap] : aTableStatisticsMap)
{
auto tableStatisticsMapIt = tableStatisticsMap.find(schemaAndTableName);
if (tableStatisticsMapIt == tableStatisticsMap.end())
@@ -148,7 +148,7 @@ void gp_walk_info::mergeTableStatistics(const TableStatisticsMap& aTableStatisti
}
else
{
for (auto& [columnName, histogram]: aColumnStatisticsMap)
for (auto& [columnName, histogram] : aColumnStatisticsMap)
{
tableStatisticsMapIt->second[columnName] = histogram;
}
@@ -156,9 +156,16 @@ void gp_walk_info::mergeTableStatistics(const TableStatisticsMap& aTableStatisti
}
}
std::optional<ColumnStatisticsMap> gp_walk_info::findStatisticsForATable(SchemaAndTableName& schemaAndTableName)
std::optional<ColumnStatisticsMap> gp_walk_info::findStatisticsForATable(
SchemaAndTableName& schemaAndTableName)
{
auto tableStatisticsMapIt = tableStatisticsMap.find(schemaAndTableName);
for (auto& [schemaAndTableName, columnStatisticsMap] : tableStatisticsMap)
{
std::cout << "Table " << schemaAndTableName.schema << "." << schemaAndTableName.table
<< " has statistics " << columnStatisticsMap.size() << std::endl;
}
if (tableStatisticsMapIt == tableStatisticsMap.end())
{
return std::nullopt;
@@ -167,7 +174,7 @@ std::optional<ColumnStatisticsMap> gp_walk_info::findStatisticsForATable(SchemaA
return {tableStatisticsMapIt->second};
}
}
} // namespace cal_impl_if
namespace
{

View File

@@ -38,13 +38,6 @@ void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const s
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
static const size_t MaxParallelFactor = 16;
bool tableAliasEqual(const execplan::CalpontSystemCatalog::TableAliasName& lhs,
const execplan::CalpontSystemCatalog::TableAliasName& rhs)
{
return (lhs.schema == rhs.schema && lhs.table == rhs.table && lhs.alias == rhs.alias &&
lhs.fisColumnStore == rhs.fisColumnStore);
}
bool tableIsInUnion(const execplan::CalpontSystemCatalog::TableAliasName& table,
execplan::CalpontSelectExecutionPlan& csep)
{
@@ -66,14 +59,26 @@ bool someAreForeignTables(execplan::CalpontSelectExecutionPlan& csep)
[](const auto& table) { return !table.isColumnstore(); });
}
bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep)
bool someForeignTablesHasIndex(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
{
return std::any_of(
csep.tableList().begin(), csep.tableList().end(),
[&ctx](const auto& table)
{
cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table};
return (!table.isColumnstore() &&
ctx.gwi.tableStatisticsMap.find(schemaAndTableName) != ctx.gwi.tableStatisticsMap.end());
});
}
bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
{
auto tables = csep.tableList();
// This is leaf and there are no other tables at this level in neither UNION, nor derived table.
// TODO filter out CSEPs with orderBy, groupBy, having
// Filter out tables that were re-written.
// return tables.size() == 1 && !tables[0].isColumnstore() && !tableIsInUnion(tables[0], csep);
return someAreForeignTables(csep);
return someAreForeignTables(csep) && someForeignTablesHasIndex(csep, ctx);
}
// This routine produces a new ParseTree that is AND(lowerBand <= column, column <= upperBand)
@@ -135,7 +140,7 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl
{
execplan::CalpontSystemCatalog::TableAliasName rcTable(
simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", false);
if (!tableAliasEqual(targetTable, rcTable))
if (!targetTable.weakerEq(rcTable))
{
continue;
}
@@ -217,7 +222,7 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
for (auto& bound : bounds)
{
auto clonedCSEP = csep.cloneWORecursiveSelects();
auto clonedCSEP = csep.cloneForTableWORecursiveSelects(table);
// Add BETWEEN based on key column range
clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, *keyColumn, bound));
unionVec.push_back(clonedCSEP);
@@ -239,10 +244,22 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
std::cout << "Processing table schema " << schemaAndTableName.schema << " table "
<< schemaAndTableName.table << " alias " << table.alias << std::endl;
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName);
std::cout << "Column statistics: " << columnStatistics.has_value() << std::endl;
// TODO add column statistics check to the corresponding match
if (!table.isColumnstore() && columnStatistics)
{
auto derivedSCEP = csep.cloneWORecursiveSelects();
auto derivedSCEP = csep.cloneForTableWORecursiveSelects(table);
// Remove the filters as they were pushed down to union units
// This is inappropriate for EXISTS filter and join conditions
// WIP replace with filters applied to filters, so that only relevant filters are left
derivedSCEP->filters(nullptr);
auto* derivedCSEP = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(derivedSCEP.get());
if (!derivedCSEP)
{
continue;
}
auto additionalUnionVec = makeUnionFromTable(*derivedCSEP, table, ctx);
// need to add a level here
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
std::to_string(ctx.uniqueId);
@@ -252,101 +269,95 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
derivedSCEP->derivedTbAlias(tableAlias);
// Create a copy of the current leaf CSEP with additional filters to partition the key column
auto additionalUnionVec = makeUnionFromTable(csep, table, ctx);
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
additionalUnionVec.end());
newDerivedTableList.push_back(derivedSCEP);
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
newTableList.push_back(tn);
// Remove the filters as they were pushed down to union units
// This is inappropriate for EXISTS filter and join conditions
derivedSCEP->filters(nullptr);
ruleHasBeenApplied = true;
}
else
{
newTableList.push_back(table);
}
}
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
// [[maybe_unused]] size_t colPosition = 0;
// replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map
for (auto& rc : csep.returnedCols())
if (!newDerivedTableList.empty())
{
// TODO support expressions
// Find SC for the RC
auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
// TODO timezone and result type are not copied
// TODO add specific ctor for this functionality
// If there is an alias in the map then it is a new derived table
auto sc = dynamic_cast<execplan::SimpleColumn*>(rc.get());
std::vector<execplan::SimpleColumn*> scs;
// execplan::ParseTree pt(rc.get());
// pt.walk(execplan::getSimpleCols, &scs);
// auto sc = scs[0];
std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias "
<< sc->tableAlias() << std::endl;
auto newTableAliasAndColPositionCounter =
tableAliasMap.find({sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false});
if (newTableAliasAndColPositionCounter == tableAliasMap.end())
for (auto& rc : csep.returnedCols())
{
std::cout << "The RC doesn't belong to any of the derived tables, so leave it intact" << std::endl;
continue;
// TODO support expressions
// Find SC for the RC
auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
// TODO timezone and result type are not copied
// TODO add specific ctor for this functionality
// If there is an alias in the map then it is a new derived table
auto sc = dynamic_cast<execplan::SimpleColumn*>(rc.get());
std::vector<execplan::SimpleColumn*> scs;
// execplan::ParseTree pt(rc.get());
// pt.walk(execplan::getSimpleCols, &scs);
std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias "
<< sc->tableAlias() << std::endl;
for (auto& [tableAlias, aliasAndCounter] : tableAliasMap)
{
std::cout << "Processing table alias " << tableAlias << " new alias " << aliasAndCounter.first
<< " col position " << aliasAndCounter.second << std::endl;
}
auto newTableAliasAndColPositionCounter =
tableAliasMap.find({sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false});
if (newTableAliasAndColPositionCounter == tableAliasMap.end())
{
std::cout << "The RC doesn't belong to any of the derived tables, so leave it intact" << std::endl;
continue;
}
sc->tableName("");
sc->schemaName("");
auto& [newTableAlias, colPosition] = newTableAliasAndColPositionCounter->second;
sc->tableAlias(newTableAlias);
// WIP Not needed according with CSEP output
// sc->isColumnStore(true);
sc->colPosition(colPosition++);
// rcCloned->colPosition(colPosition++);
// rcCloned->resultType(rc->resultType());
// newReturnedColumns.push_back(rcCloned);
}
sc->tableName("");
sc->schemaName("");
auto& [newTableAlias, colPosition] = newTableAliasAndColPositionCounter->second;
sc->tableAlias(newTableAlias);
// WIP Not needed according with CSEP output
// sc->isColumnStore(true);
sc->colPosition(colPosition++);
// rcCloned->colPosition(colPosition++);
// rcCloned->resultType(rc->resultType());
// newReturnedColumns.push_back(rcCloned);
}
// Remove the filters if necessary using csep.filters(nullptr) as they were pushed down to union units
// But this is inappropriate for EXISTS filter and join conditions
// There must be no derived at this point, so we can replace it with the new derived table list
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
[[maybe_unused]] size_t colPosition = 0;
// replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map
for (auto& rc : csep.returnedCols())
{
// TODO support expressions
// Find SC for the RC
auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
// TODO timezone and result type are not copied
// TODO add specific ctor for this functionality
// If there is an alias in the map then it is a new derived table
auto sc = dynamic_cast<execplan::SimpleColumn*>(rc.get());
std::vector<execplan::SimpleColumn*> scs;
// execplan::ParseTree pt(rc.get());
// pt.walk(execplan::getSimpleCols, &scs);
// auto sc = scs[0];
std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias "
<< sc->tableAlias() << std::endl;
auto newTableAlias = tableAliasMap.find(
{sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false});
if (newTableAlias == tableAliasMap.end())
auto* left = dynamic_cast<execplan::SimpleFilter*>(csep.filters()->data());
if (left)
{
std::cout << "The RC doesn't belong to any of the derived tables, so leave it intact" << std::endl;
continue;
auto* lhs = left->lhs()->clone();
if (lhs)
{
auto* lhsSC = dynamic_cast<execplan::SimpleColumn*>(lhs);
if (lhsSC)
{
auto newTableAlias = tableAliasMap.find({lhsSC->schemaName(), lhsSC->tableName(), lhsSC->tableAlias(), "", false});
// WIP Leak loosing previous lhs
if (newTableAlias != tableAliasMap.end())
{
lhsSC->tableName("");
lhsSC->schemaName("");
lhsSC->tableAlias(newTableAlias->second.first);
lhsSC->colPosition(0);
left->lhs(lhs);
}
}
}
}
sc->tableName("");
sc->schemaName("");
sc->tableAlias(newTableAlias->second);
sc->isColumnStore(true);
sc->colPosition(colPosition++);
// rcCloned->colPosition(colPosition++);
// rcCloned->resultType(rc->resultType());
// newReturnedColumns.push_back(rcCloned);
csep.derivedTableList(newDerivedTableList);
// Replace table list with new table list populated with union units
csep.tableList(newTableList);
// csep.returnedCols(newReturnedColumns);
}
// Remove the filters if necessary using csep.filters(nullptr) as they were pushed down to union units
// But this is inappropriate for EXISTS filter and join conditions
// There must be no derived at this point, so we can replace it with the new derived table list
csep.derivedTableList(newDerivedTableList);
// Replace table list with new table list populated with union units
csep.tableList(newTableList);
csep.returnedCols(newReturnedColumns);
return ruleHasBeenApplied;
}

View File

@@ -58,6 +58,6 @@ using NewTableAliasAndColumnPosCounter = std::pair<string, size_t>;
using TableAliasMap = std::map<execplan::CalpontSystemCatalog::TableAliasName,
NewTableAliasAndColumnPosCounter, TableAliasLessThan>;
bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep);
bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx);
bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx);
}
} // namespace optimizer

View File

@@ -130,7 +130,7 @@ bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimiz
// TODO add walking nested subselect in projection. See CSEP::fSelectSubList
if (mayApply(*current))
if (mayApply(*current, ctx))
{
rewrite |= applyRule(*current, ctx);
++ctx.uniqueId;

View File

@@ -44,7 +44,7 @@ public:
struct Rule
{
// returns true if rule may be applied
using RuleApplierFilter = bool (*)(execplan::CalpontSelectExecutionPlan&);
using RuleApplierFilter = bool (*)(execplan::CalpontSelectExecutionPlan&, RBOptimizerContext&);
// returns true if rule was applied
using RuleApplier = bool (*)(execplan::CalpontSelectExecutionPlan&, RBOptimizerContext&);