From b07ee73fb39dbd0b80960d385b6e0f8ea3102ea3 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Sat, 26 Jul 2025 10:29:50 +0000 Subject: [PATCH] feat(rbo,rules,QA): filtered RC clone for UNION units. --- dbcon/execplan/calpontselectexecutionplan.cpp | 15 +- dbcon/execplan/returnedcolumn.h | 2 +- dbcon/execplan/simplecolumn.cpp | 5 + dbcon/execplan/simplecolumn.h | 3 + dbcon/mysql/ha_mcs_execplan.cpp | 10 +- dbcon/mysql/rbo_apply_parallel_ces.cpp | 144 +++++++++++------- 6 files changed, 114 insertions(+), 65 deletions(-) diff --git a/dbcon/execplan/calpontselectexecutionplan.cpp b/dbcon/execplan/calpontselectexecutionplan.cpp index 2eb1459f0..723ea9e5c 100644 --- a/dbcon/execplan/calpontselectexecutionplan.cpp +++ b/dbcon/execplan/calpontselectexecutionplan.cpp @@ -1017,11 +1017,13 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects( ReturnedColumnList newReturnedCols; for (const auto& rc : fReturnedCols) { - auto* simpleColumn = dynamic_cast(rc.get()); - if (simpleColumn) + rc->setSimpleColumnList(); + for (auto* simpleColumn : rc->simpleColumnList()) { + // TODO check that is columnstore is correct execplan::CalpontSystemCatalog::TableAliasName rcTable( - simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", false); + simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", + simpleColumn->isColumnStore()); if (!targetTableAlias.weakerEq(rcTable)) { continue; @@ -1029,6 +1031,11 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects( newReturnedCols.push_back(SRCP(rc->clone())); } } + if (newReturnedCols.empty()) + { + std::cout << "cloneForTableWORecursiveSelects(): there are no Returned Columns after table filtering." + << std::endl; + } newPlan->returnedCols(newReturnedCols); // Deep copy of filters @@ -1059,7 +1066,7 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects( OrderByColumnList newOrderByCols; for (const auto& col : fOrderByCols) { - newOrderByCols.push_back(SRCP(col->clone())); + newOrderByCols.push_back(SRCP(col->clone())); } newPlan->orderByCols(newOrderByCols); diff --git a/dbcon/execplan/returnedcolumn.h b/dbcon/execplan/returnedcolumn.h index fa84ca12a..ddf06bec4 100644 --- a/dbcon/execplan/returnedcolumn.h +++ b/dbcon/execplan/returnedcolumn.h @@ -288,7 +288,7 @@ class ReturnedColumn : public TreeNode /* @brief traverse this ReturnedColumn and re-populate fSimpleColumnList. * * @note all ReturnedColumns that may have simple column arguments added - * to the list need to implement thhis function. + * to the list need to implement this function. */ virtual void setSimpleColumnList(); diff --git a/dbcon/execplan/simplecolumn.cpp b/dbcon/execplan/simplecolumn.cpp index a68fd6d02..bb14867c6 100644 --- a/dbcon/execplan/simplecolumn.cpp +++ b/dbcon/execplan/simplecolumn.cpp @@ -751,4 +751,9 @@ void SimpleColumn::evaluate(Row& row, bool& isNull) } } +void SimpleColumn::setSimpleColumnList() +{ + fSimpleColumnList.push_back(this); +} + } // namespace execplan diff --git a/dbcon/execplan/simplecolumn.h b/dbcon/execplan/simplecolumn.h index 863875013..8064cdc0f 100644 --- a/dbcon/execplan/simplecolumn.h +++ b/dbcon/execplan/simplecolumn.h @@ -264,6 +264,9 @@ class SimpleColumn : public ReturnedColumn */ bool singleTable(CalpontSystemCatalog::TableAliasName& tan) override; + void setSimpleColumnList() override; + + protected: /** * Fields diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 11d29a3af..62b7b1c12 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6307,7 +6307,15 @@ void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi) if (histogram) { SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str}; - gwi.tableStatisticsMap[tableName][ifp->field->field_name.str] = *histogram; + auto tableStatisticsMapIt = gwi.tableStatisticsMap.find(tableName); + if (tableStatisticsMapIt == gwi.tableStatisticsMap.end()) + { + gwi.tableStatisticsMap.insert({tableName, {{ifp->field->field_name.str, *histogram}}}); + } + else + { + tableStatisticsMapIt->second.insert({ifp->field->field_name.str, *histogram}); + } } } } diff --git a/dbcon/mysql/rbo_apply_parallel_ces.cpp b/dbcon/mysql/rbo_apply_parallel_ces.cpp index e3252db08..782571331 100644 --- a/dbcon/mysql/rbo_apply_parallel_ces.cpp +++ b/dbcon/mysql/rbo_apply_parallel_ces.cpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include "rulebased_optimizer.h" @@ -33,6 +35,9 @@ namespace optimizer { +template +using FilterRangeBounds = std::vector>; + void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const size_t id); static const std::string RewrittenSubTableAliasPrefix = "$added_sub_"; @@ -163,6 +168,55 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl return nullptr; } +// Populates range bounds based on column statistics +// Returns optional with bounds if successful, nullopt otherwise +template +std::optional> populateRangeBounds(execplan::SimpleColumn* keyColumn, + optimizer::RBOptimizerContext& ctx) +{ + cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()}; + auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); + if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end()) + { + return std::nullopt; + } + + auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName()); + if (columnStatisticsIt == tableColumnsStatisticsIt->second.end()) + { + return std::nullopt; + } + + auto columnStatistics = columnStatisticsIt->second; + + // TODO configurable parallel factor via session variable + // NB now histogram size is the way to control parallel factor with 16 being the maximum + size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor); + size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; + + FilterRangeBounds bounds; + + // Loop over buckets to produce filter ranges + for (size_t i = 0; i < numberOfUnionUnits - 1; ++i) + { + auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; + auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; + T currentLowerBound = *(uint32_t*)bucket->start_value.data(); + T currentUpperBound = *(uint32_t*)endBucket->start_value.data(); + bounds.push_back({currentLowerBound, currentUpperBound}); + } + + // Add last range + // NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start + auto lastBucket = + columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit; + T currentLowerBound = *(uint32_t*)lastBucket->start_value.data(); + T currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); + bounds.push_back({currentLowerBound, currentUpperBound}); + + return bounds; +} + // TODO char and other numerical types support execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( execplan::CalpontSelectExecutionPlan& csep, execplan::CalpontSystemCatalog::TableAliasName& table, @@ -179,46 +233,12 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( } // TODO char and other numerical types support - std::vector> bounds; + auto boundsOpt = populateRangeBounds(keyColumn, ctx); + if (!boundsOpt.has_value()) { - cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()}; - auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); - if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end()) - { - return unionVec; - } - - auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName()); - if (columnStatisticsIt == tableColumnsStatisticsIt->second.end()) - { - return unionVec; - } - - auto columnStatistics = columnStatisticsIt->second; - - // TODO configurable parallel factor via session variable - // NB now histogram size is the way to control parallel factor with 16 being the maximum - size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor); - size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; - - // Loop over buckets to produce filter ranges - for (size_t i = 0; i < numberOfUnionUnits - 1; ++i) - { - auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; - auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; - uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data(); - uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data(); - bounds.push_back({currentLowerBound, currentUpperBound}); - } - - // Add last range - // NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start - auto lastBucket = columnStatistics.get_json_histogram().begin() + - (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit; - uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data(); - uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); - bounds.push_back({currentLowerBound, currentUpperBound}); + return unionVec; } + auto& bounds = boundsOpt.value(); for (auto& bound : bounds) { @@ -243,15 +263,19 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table}; std::cout << "Processing table schema " << schemaAndTableName.schema << " table " << schemaAndTableName.table << " alias " << table.alias << std::endl; - auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); - std::cout << "Column statistics: " << columnStatistics.has_value() << std::endl; + auto anyColumnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); + std::cout << "Column statistics: " << anyColumnStatistics.has_value() << std::endl; // TODO add column statistics check to the corresponding match - if (!table.isColumnstore() && columnStatistics) + if (!table.isColumnstore() && anyColumnStatistics) { + // Don't copy filters for this auto derivedSCEP = csep.cloneForTableWORecursiveSelects(table); // Remove the filters as they were pushed down to union units // This is inappropriate for EXISTS filter and join conditions // WIP replace with filters applied to filters, so that only relevant filters are left + // WIP Ugly hack to avoid leaks + auto unusedFilters = derivedSCEP->filters(); + delete unusedFilters; derivedSCEP->filters(nullptr); auto* derivedCSEP = dynamic_cast(derivedSCEP.get()); if (!derivedCSEP) @@ -283,8 +307,7 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon } } - execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns; - // [[maybe_unused]] size_t colPosition = 0; + // execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns; // replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map if (!newDerivedTableList.empty()) { @@ -298,8 +321,6 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon // If there is an alias in the map then it is a new derived table auto sc = dynamic_cast(rc.get()); std::vector scs; - // execplan::ParseTree pt(rc.get()); - // pt.walk(execplan::getSimpleCols, &scs); std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias " << sc->tableAlias() << std::endl; @@ -330,24 +351,29 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon // But this is inappropriate for EXISTS filter and join conditions // There must be no derived at this point, so we can replace it with the new derived table list - auto* left = dynamic_cast(csep.filters()->data()); - if (left) + // WIP hardcoded query + if (csep.filters() && csep.filters()->data()) { - auto* lhs = left->lhs()->clone(); - if (lhs) + auto* left = dynamic_cast(csep.filters()->data()); + if (left) { - auto* lhsSC = dynamic_cast(lhs); - if (lhsSC) + auto* lhs = left->lhs()->clone(); + if (lhs) { - auto newTableAlias = tableAliasMap.find({lhsSC->schemaName(), lhsSC->tableName(), lhsSC->tableAlias(), "", false}); - // WIP Leak loosing previous lhs - if (newTableAlias != tableAliasMap.end()) + auto* lhsSC = dynamic_cast(lhs); + if (lhsSC) { - lhsSC->tableName(""); - lhsSC->schemaName(""); - lhsSC->tableAlias(newTableAlias->second.first); - lhsSC->colPosition(0); - left->lhs(lhs); + auto newTableAlias = + tableAliasMap.find({lhsSC->schemaName(), lhsSC->tableName(), lhsSC->tableAlias(), "", false}); + // WIP Leak loosing previous lhs + if (newTableAlias != tableAliasMap.end()) + { + lhsSC->tableName(""); + lhsSC->schemaName(""); + lhsSC->tableAlias(newTableAlias->second.first); + lhsSC->colPosition(0); + left->lhs(lhs); + } } } }