diff --git a/dbcon/mysql/ha_from_sub.cpp b/dbcon/mysql/ha_from_sub.cpp index 177887323..66e4d8648 100644 --- a/dbcon/mysql/ha_from_sub.cpp +++ b/dbcon/mysql/ha_from_sub.cpp @@ -444,6 +444,9 @@ SCSEP FromSubQuery::transform() return csep; } + // Insert column statistics + fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap); + fGwip.subselectList.push_back(csep); return csep; } diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index aa3b604ca..486bb7979 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -49,7 +49,7 @@ using namespace logging; #define PREFER_MY_CONFIG_H #include #include "idb_mysql.h" -#include "opt_histogram_json.h" + #include "partition_element.h" #include "partition_info.h" @@ -6290,24 +6290,25 @@ int processLimitAndOffset(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi) { - if (!ifp->field->part_of_key.is_clear_all()) - { - return; - } - std::cout << "Processing field item: " << ifp->field_name.str << std::endl; + // TODO find clear way to check if the field is part of a key + // if (!ifp->field->part_of_key.is_clear_all()) + // { + // return; + // } + // std::cout << "Processing field item: " << ifp->field_name.str << std::endl; // std::cout << "part of a key: " << buf << std::endl; - std::cout << "ifp->field->field_index " << ifp->field->field_index << std::endl; + // std::cout << "ifp->field->field_index " << ifp->field->field_index << std::endl; for (uint j = 0; j < ifp->field->table->s->keys; j++) { for (uint i = 0; i < ifp->field->table->s->key_info[j].usable_key_parts; i++) { - std::cout << "key fieldnr " << i << " " - << ifp->field->table->s->key_info[j].key_part[i].field->field_name.str << " " - << ifp->field->table->s->key_info[j].key_part[i].fieldnr << std::endl; + // std::cout << "key fieldnr " << i << " " + // << ifp->field->table->s->key_info[j].key_part[i].field->field_name.str << " " + // << ifp->field->table->s->key_info[j].key_part[i].fieldnr << std::endl; if (ifp->field->table->s->key_info[j].key_part[i].fieldnr == ifp->field->field_index + 1) { - std::cout << "key_info " << j << " key_part " << i << " matched " << std::endl; + // std::cout << "key_info " << j << " key_part " << i << " matched " << std::endl; if (i == 0 && ifp->field->read_stats) { assert(ifp->field->table->s); @@ -6315,7 +6316,15 @@ void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi) // assert(ifp->field->table->s->table_name); // FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str}, {ifp->field->field_name.str}); //TODO use FQCN as a key type - gwi.columnStatisticsMap[ifp->field->field_name.str] = ifp->field->read_stats->histogram->get_histogram(); + std::cout << "Adding column statistics for " << ifp->field->field_name.str << std::endl; + auto* histogram = dynamic_cast(ifp->field->read_stats->histogram); + if (histogram) + { + std::cout << "Type of histogram object: " << typeid(*histogram).name() << std::endl; + std::vector histogramBuckets = histogram->get_histogram(); + std::cout << "gwi.columnStatisticsMap[ifp->field->field_name.str].size() " << histogramBuckets.size() << std::endl; + gwi.columnStatisticsMap[ifp->field->field_name.str] = histogramBuckets; + } } } } @@ -6412,7 +6421,7 @@ int processSelect(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, vector { Item_field* ifp = (Item_field*)item; extractColumnStatistics(ifp, gwi); - + std::cout << "gwi.columnStatisticsMap 1 size " << gwi.columnStatisticsMap.size() << std::endl; if (ifp->field_name.length && string(ifp->field_name.str) == "*") { collectAllCols(gwi, ifp); @@ -7464,6 +7473,7 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit); + std::cout << "cs_get_select_plan columnStatisticsMap size " << gwi.columnStatisticsMap.size() << std::endl; if (status > 0) return ER_INTERNAL_ERROR; else if (status < 0) @@ -7481,7 +7491,8 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& if (get_unstable_optimizer(thd)) { - bool csepWasOptimized = optimizer::optimizeCSEP(*csep); + optimizer::RBOptimizerContext ctx(gwi); + bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx); if (csep->traceOn() && csepWasOptimized) { cerr << "---------------- cs_get_select_plan optimized EXECUTION PLAN ----------------" << endl; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 797ac40c4..55aa4bf4a 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -135,6 +135,11 @@ using namespace funcexp; namespace cal_impl_if { extern bool nonConstFunc(Item_func* ifp); + +void gp_walk_info::mergeColumnStatisticsMap(const ColumnStatisticsMap& aColumnStatisticsMap) +{ + columnStatisticsMap.insert(aColumnStatisticsMap.begin(), aColumnStatisticsMap.end()); +} } namespace diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 46050713d..95335b7eb 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -102,6 +102,7 @@ typedef dmlpackage::TableValuesMap TableValuesMap; typedef std::map> TableMap; typedef std::tr1::unordered_map> TableOnExprList; typedef std::tr1::unordered_map TableOuterJoinMap; +using ColumnStatisticsMap = std::unordered_map>; struct gp_walk_info { @@ -232,6 +233,8 @@ struct gp_walk_info { } ~gp_walk_info(); + + void mergeColumnStatisticsMap(const std::unordered_map>& columnStatisticsMap); }; struct SubQueryChainHolder; diff --git a/dbcon/mysql/ha_select_sub.cpp b/dbcon/mysql/ha_select_sub.cpp index 3939c11b4..b2f39aa27 100644 --- a/dbcon/mysql/ha_select_sub.cpp +++ b/dbcon/mysql/ha_select_sub.cpp @@ -96,6 +96,12 @@ SCSEP SelectSubQuery::transform() return csep; } + // Insert column statistics + fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap); + // std::cout << "fGwip.columnStatisticsMap 2 size " << fGwip.columnStatisticsMap.size() << std::endl; + // std::cout << "gwi.columnStatisticsMap 2 size " << gwi.columnStatisticsMap.size() << std::endl; + + // Insert subselect CSEP fGwip.subselectList.push_back(csep); // remove outer query tables diff --git a/dbcon/mysql/idb_mysql.h b/dbcon/mysql/idb_mysql.h index ba700ff93..2627c6fcf 100644 --- a/dbcon/mysql/idb_mysql.h +++ b/dbcon/mysql/idb_mysql.h @@ -67,6 +67,7 @@ #include "rpl_rli.h" #include "my_dbug.h" #include "sql_show.h" +#include "opt_histogram_json.h" #pragma GCC diagnostic pop // Now clean up the pollution as best we can... diff --git a/dbcon/mysql/rulebased_optimizer.cpp b/dbcon/mysql/rulebased_optimizer.cpp index ee674cfba..e1b6acd07 100644 --- a/dbcon/mysql/rulebased_optimizer.cpp +++ b/dbcon/mysql/rulebased_optimizer.cpp @@ -15,6 +15,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include +#include +#include +#include + +#include "rulebased_optimizer.h" + #include "constantcolumn.h" #include "execplan/calpontselectexecutionplan.h" #include "execplan/simplecolumn.h" @@ -23,44 +30,45 @@ #include "operator.h" #include "predicateoperator.h" #include "simplefilter.h" -#include "rulebased_optimizer.h" -#include -#include namespace optimizer { +void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const size_t id); + static const std::string RewrittenSubTableAliasPrefix = "$added_sub_"; // Apply a list of rules to a CSEP -bool optimizeCSEPWithRules(execplan::CalpontSelectExecutionPlan& root, const std::vector& rules) +bool optimizeCSEPWithRules(execplan::CalpontSelectExecutionPlan& root, const std::vector& rules, + optimizer::RBOptimizerContext& ctx) { bool changed = false; for (const auto& rule : rules) { - changed |= rule.apply(root); + changed |= rule.apply(root, ctx); } return changed; } // high level API call for optimizer -bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root) +bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimizerContext& ctx) { optimizer::Rule parallelCES{"parallelCES", optimizer::matchParallelCES, optimizer::applyParallelCES}; - std::vector rules = {parallelCES}; + std::vector rules = {parallelCES}; - return optimizeCSEPWithRules(root, rules); + return optimizeCSEPWithRules(root, rules, ctx); } // Apply iteratively until CSEP is converged by rule -bool Rule::apply(execplan::CalpontSelectExecutionPlan& root) const +bool Rule::apply(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimizerContext& ctx) const { bool changedThisRound = false; bool hasBeenApplied = false; + do { - changedThisRound = walk(root); + changedThisRound = walk(root, ctx); hasBeenApplied |= changedThisRound; } while (changedThisRound && !applyOnlyOnce); @@ -68,69 +76,42 @@ bool Rule::apply(execplan::CalpontSelectExecutionPlan& root) const } // DFS walk to match CSEP and apply rules if match -bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep) const +bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) const { bool rewrite = false; - for (auto& table : csep.derivedTableList()) + std::stack planStack; + planStack.push(&csep); + + while (!planStack.empty()) { - auto* csepPtr = dynamic_cast(table.get()); - if (!csepPtr) + execplan::CalpontSelectExecutionPlan* current = planStack.top(); + planStack.pop(); + + for (auto& table : current->derivedTableList()) { - continue; - } - - auto& csepLocal = *csepPtr; - rewrite |= walk(csepLocal); - } - - for (auto& unionUnit : csep.unionVec()) - { - auto* unionUnitPtr = dynamic_cast(unionUnit.get()); - if (!unionUnitPtr) - { - continue; - } - - auto& unionUnitLocal = *unionUnitPtr; - rewrite |= walk(unionUnitLocal); - } - - if (csep.filters() != nullptr) - { - bool rewriteLocal = false; - std::vector stack; - stack.push_back(csep.filters()); - while (!stack.empty()) - { - execplan::ParseTree* node = stack.back(); - stack.pop_back(); - if (node == nullptr) - continue; - - auto* existsFilter = dynamic_cast(node->data()); - if (existsFilter) + auto* csepPtr = dynamic_cast(table.get()); + if (csepPtr) { - if (matchRule(*existsFilter->sub())) - { - applyRule(*existsFilter->sub()); - rewriteLocal = true; - } + planStack.push(csepPtr); } - - if (node->right()) - stack.push_back(node->right()); - if (node->left()) - stack.push_back(node->left()); } - if (rewriteLocal) - rewrite |= rewriteLocal; - } - if (matchRule(csep)) - { - applyRule(csep); - rewrite = true; + for (auto& unionUnit : current->unionVec()) + { + auto* unionUnitPtr = dynamic_cast(unionUnit.get()); + if (unionUnitPtr) + { + planStack.push(unionUnitPtr); + } + } + + if (matchRule(*current)) + { + applyRule(*current, ctx); + ++ctx.uniqueId; + rewrite = true; + } } return rewrite; @@ -177,7 +158,7 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, auto* filterColLeftOp = new execplan::ConstantColumnUInt(bound.second, 0, 0); // set TZ // There is a question with ownership of the const column - // WIP here we lost upper bound value if predicate is not changed to weak lt + // WIP here we lost upper bound value if predicate is not changed to weak lt execplan::SOP ltOp = boost::make_shared(execplan::PredicateOperator("<")); ltOp->setOpType(filterColLeftOp->resultType(), tableKeyColumnLeftOp->resultType()); ltOp->resultType(ltOp->operationType()); @@ -210,14 +191,42 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, return ptp; } +execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep) +{ + return dynamic_cast(csep.returnedCols().front().get()); +} + execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( - const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep) + execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) { execplan::CalpontSelectExecutionPlan::SelectList unionVec; - unionVec.reserve(numberOfLegs); - std::vector> bounds({{0, 3000961}, - {3000961, std::numeric_limits::max()}}); - for (auto bound : bounds) + // unionVec.reserve(numberOfLegs); + execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep); + std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.columnStatisticsMap " << " with size " << ctx.gwi.columnStatisticsMap.size() << std::endl; + for (auto& [k, v] : ctx.gwi.columnStatisticsMap) + { + std::cout << "key " << k << std::endl; + } + if (!keyColumn || + ctx.gwi.columnStatisticsMap.find(keyColumn->columnName()) == ctx.gwi.columnStatisticsMap.end()) + { + return unionVec; + } + + auto columnStatistics = ctx.gwi.columnStatisticsMap[keyColumn->columnName()]; + std::cout << "columnStatistics.size() " << columnStatistics.size() << std::endl; + // TODO char and other numerical types support + std::vector> bounds; + std::transform(columnStatistics.begin(), columnStatistics.end(), std::back_inserter(bounds), + [](const auto& bucket) + { + uint64_t lowerBound = std::stoul(bucket.start_value); + uint64_t upperBound = lowerBound + bucket.ndv; + return std::make_pair(lowerBound, upperBound); + }); + // std::vector> bounds({{0, 3000961}, + // // {3000961, std::numeric_limits::max()}}); + for (auto& bound : bounds) { auto clonedCSEP = csep.cloneWORecursiveSelects(); // Add BETWEEN based on key column range @@ -227,8 +236,7 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( return unionVec; } - -void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep) +void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) { auto tables = csep.tableList(); execplan::CalpontSelectExecutionPlan::TableList newTableList; @@ -242,16 +250,17 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep) { auto derivedSCEP = csep.cloneWORecursiveSelects(); // need to add a level here - std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table; + std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" + + std::to_string(ctx.uniqueId); derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM); derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS); derivedSCEP->derivedTbAlias(tableAlias); // TODO: hardcoded for now - size_t parallelFactor = 2; + // size_t parallelFactor = 2; // Create a copy of the current leaf CSEP with additional filters to partition the key column - auto additionalUnionVec = makeUnionFromTable(parallelFactor, csep); + auto additionalUnionVec = makeUnionFromTable(csep, ctx); derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(), additionalUnionVec.end()); @@ -275,11 +284,95 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep) execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, ""); newTableList.push_back(tn); // Remove the filters as they were pushed down to union units + // This is inappropriate for EXISTS filter and join conditions derivedSCEP->filters(nullptr); } } // Remove the filters as they were pushed down to union units - csep.filters(nullptr); + // This is inappropriate for EXISTS filter and join conditions + // csep.filters(nullptr); + // There must be no derived at this point. + csep.derivedTableList(newDerivedTableList); + // Replace table list with new table list populated with union units + csep.tableList(newTableList); + csep.returnedCols(newReturnedColumns); +} + +execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists( + const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep) +{ + execplan::CalpontSelectExecutionPlan::SelectList unionVec; + unionVec.reserve(numberOfLegs); + std::vector> bounds( + {{0, 3000961}, {3000961, std::numeric_limits::max()}}); + for (auto bound : bounds) + { + auto clonedCSEP = csep.cloneWORecursiveSelects(); + clonedCSEP->filters(nullptr); + // Add BETWEEN based on key column range + clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound)); + unionVec.push_back(clonedCSEP); + } + + return unionVec; +} + +void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) +{ + auto tables = csep.tableList(); + execplan::CalpontSelectExecutionPlan::TableList newTableList; + execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList; + execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns; + + // ATM Must be only 1 table + for (auto& table : tables) + { + if (!table.isColumnstore()) + { + auto derivedSCEP = csep.cloneWORecursiveSelects(); + // need to add a level here + std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" + + std::to_string(ctx.uniqueId); + + derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM); + derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS); + derivedSCEP->derivedTbAlias(tableAlias); + + // TODO: hardcoded for now + size_t parallelFactor = 2; + // Create a copy of the current leaf CSEP with additional filters to partition the key column + auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep); + derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(), + additionalUnionVec.end()); + + size_t colPosition = 0; + // change parent to derived table columns + for (auto& rc : csep.returnedCols()) + { + auto rcCloned = boost::make_shared(*rc); + // TODO timezone and result type are not copied + // TODO add specific ctor for this functionality + rcCloned->tableName(""); + rcCloned->schemaName(""); + rcCloned->tableAlias(tableAlias); + rcCloned->colPosition(colPosition++); + rcCloned->resultType(rc->resultType()); + + newReturnedColumns.push_back(rcCloned); + } + + newDerivedTableList.push_back(derivedSCEP); + execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, ""); + newTableList.push_back(tn); + // Remove the filters as they were pushed down to union units + // This is inappropriate for EXISTS filter and join conditions + // TODO if needed + derivedSCEP->filters(nullptr); + } + } + // Remove the filters as they were pushed down to union units + // This is inappropriate for EXISTS filter and join conditions + // csep.filters(nullptr); // There must be no derived at this point. csep.derivedTableList(newDerivedTableList); // Replace table list with new table list populated with union units diff --git a/dbcon/mysql/rulebased_optimizer.h b/dbcon/mysql/rulebased_optimizer.h index df0fa8556..65a8adcc6 100644 --- a/dbcon/mysql/rulebased_optimizer.h +++ b/dbcon/mysql/rulebased_optimizer.h @@ -18,14 +18,31 @@ #pragma once #include + +#define PREFER_MY_CONFIG_H +#include +#include "idb_mysql.h" + +#include "ha_mcs_impl_if.h" + #include "execplan/calpontselectexecutionplan.h" namespace optimizer { +class RBOptimizerContext { +public: + RBOptimizerContext() = delete; + RBOptimizerContext(cal_impl_if::gp_walk_info& walk_info) : gwi(walk_info) {} + // gwi lifetime should be longer than optimizer context. + // In plugin runtime this is always true. + cal_impl_if::gp_walk_info& gwi; + uint64_t uniqueId {0}; +}; + struct Rule { using RuleMatcher = bool (*)(execplan::CalpontSelectExecutionPlan&); - using RuleApplier = void (*)(execplan::CalpontSelectExecutionPlan&); + using RuleApplier = void (*)(execplan::CalpontSelectExecutionPlan&, RBOptimizerContext&); Rule(std::string&& name, RuleMatcher matchRule, RuleApplier applyRule) : name(name), matchRule(matchRule), applyRule(applyRule) {}; @@ -39,15 +56,21 @@ struct Rule Rule() = default; Rule(const Rule&) = default; Rule(Rule&&) = default; + + std::string getName() const + { + return name; + } + Rule& operator=(const Rule&) = default; Rule& operator=(Rule&&) = default; - bool apply(execplan::CalpontSelectExecutionPlan& csep) const; - bool walk(execplan::CalpontSelectExecutionPlan& csep) const; + bool apply(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) const; + bool walk(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) const; }; bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep); -void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep); -bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root); +void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx); +bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root, RBOptimizerContext& ctx); } \ No newline at end of file