diff --git a/dbcon/mysql/ha_mcs_client_udfs.cpp b/dbcon/mysql/ha_mcs_client_udfs.cpp index e7a57c70b..3025a33d7 100644 --- a/dbcon/mysql/ha_mcs_client_udfs.cpp +++ b/dbcon/mysql/ha_mcs_client_udfs.cpp @@ -924,6 +924,103 @@ extern "C" { } + // --- Plan UDFs --- + // Return plan strings or applied rules based on optional parameter: + // no arg or 1 -> optimized/latest plan (ci->queryPlan) + // 0 or 'original' -> pre-RBO plan + // 1 or 'optimized' -> post-RBO plan + // 2 or 'rules' -> comma-separated applied RBO rules + const char* mcsgetplan(UDF_INIT* /*initid*/, UDF_ARGS* args, char* /*result*/, unsigned long* length, + char* is_null, char* /*error*/) + { + if (get_fe_conn_info_ptr() == NULL) + { + set_fe_conn_info_ptr((void*)new cal_connection_info()); + thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); + } + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + static const std::string wrongArg("Wrong argument produced"); + const std::string* out = &wrongArg; + ; + if (args && args->arg_count >= 1 && args->args[0]) + { + if (args->arg_type[0] == INT_RESULT) + { + long long sel = *reinterpret_cast(args->args[0]); + if (sel == 0) + out = &ci->queryPlanOriginal; + else if (sel == 1) + out = &ci->queryPlanOptimized; + else if (sel == 2) + out = &ci->rboAppliedRules; + } + else if (args->arg_type[0] == STRING_RESULT) + { + std::string key(args->args[0], args->lengths[0]); + // normalize to lower + for (auto& ch : key) + ch = (char)tolower((unsigned char)ch); + if (key == "original" || key == "orig" || key == "0") + out = &ci->queryPlanOriginal; + else if (key == "optimized" || key == "opt" || key == "1") + out = &ci->queryPlanOptimized; + else if (key == "rules" || key == "2") + out = &ci->rboAppliedRules; + } + } + + unsigned long l = out->size(); + + if (l == 0) + { + *is_null = 1; + return 0; + } + + if (l > TraceSize) + l = TraceSize; + + *length = l; + return out->c_str(); + } + + my_bool mcsgetplan_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count > 1) + { + sprintf(message, "MCSGETPLAN() takes 0 or 1 argument: [original|optimized|rules|0|1|2]"); + return 1; + } + if (args->arg_count == 1 && !(args->arg_type[0] == INT_RESULT || args->arg_type[0] == STRING_RESULT)) + { + sprintf(message, "MCSGETPLAN() argument must be INT or STRING"); + return 1; + } + initid->maybe_null = 1; + initid->max_length = TraceSize; + return 0; + } + + void mcsgetplan_deinit(UDF_INIT* /*initid*/) + { + } + + const char* calgetplan(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length, char* is_null, + char* error) + { + return mcsgetplan(initid, args, result, length, is_null, error); + } + + my_bool calgetplan_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return mcsgetplan_init(initid, args, message); + } + + void calgetplan_deinit(UDF_INIT* /*initid*/) + { + } + my_bool getversion_init(UDF_INIT* /*initid*/, UDF_ARGS* args, char* message, const char* funcname) { if (args->arg_count != 0) diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 0282d925f..ad674499c 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -100,6 +100,42 @@ using namespace std; namespace cal_impl_if { +// Helper utilities to store plan strings and applied rules into cal_connection_info + +enum class PlanType +{ + Original, + Optimized +}; + +static cal_connection_info* ensure_conn_info() +{ + if (get_fe_conn_info_ptr() == NULL) + { + set_fe_conn_info_ptr((void*)new cal_connection_info()); + thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); + } + return static_cast(get_fe_conn_info_ptr()); +} + +static void store_query_plan(execplan::SCSEP& csep, PlanType planType) +{ + cal_connection_info* ci = ensure_conn_info(); + switch (planType) + { + case PlanType::Original: ci->queryPlanOriginal = csep->toString(); break; + case PlanType::Optimized: ci->queryPlanOptimized = csep->toString(); break; + default: break; + } +} + + +static void store_applied_rules(const std::string rboRules) +{ + cal_connection_info* ci = ensure_conn_info(); + ci->rboAppliedRules = rboRules; +} + // This is taken from Item_cond::fix_fields in sql/item_cmpfunc.cc. void calculateNotNullTables(const std::vector& condList, table_map& not_null_tables) { @@ -7597,6 +7633,9 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; } + // Store original (pre-RBO) plan string for UDFs + store_query_plan(csep, PlanType::Original); + // Derived table projection list optimization. derivedTableOptimization(&gwi, csep); @@ -7604,7 +7643,11 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& optimizer::RBOptimizerContext ctx(gwi, *thd, csep->traceOn(), get_ces_optimization_parallel_factor(thd)); // TODO RBO can crash or fail leaving CSEP in an invalid state, so there must be a valid CSEP copy // TBD There is a tradeoff b/w copy per rule and copy per optimizer run. - bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx, get_unstable_optimizer(&ctx.thd)); + bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx, get_unstable_optimizer(&ctx.getThd())); + + // Store optimized plan and applied rules + store_query_plan(csep, PlanType::Optimized); + store_applied_rules(ctx.serializeAppliedRules()); if (csep->traceOn() && csepWasOptimized) { cerr << "---------------- cs_get_select_plan optimized EXECUTION PLAN ----------------" << endl; diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 1a377ae38..5a32ccbe8 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -404,6 +404,9 @@ struct cal_connection_info bool isCacheInsert; std::string extendedStats; std::string miniStats; + std::string queryPlanOriginal; // CSEP string before RBO + std::string queryPlanOptimized; // CSEP string after RBO + std::string rboAppliedRules; // Comma-separated list of applied RBO rules messageqcpp::MessageQueueClient* dmlProc; ha_rows rowsHaveInserted; ColNameList colNameList; diff --git a/dbcon/mysql/install_mcs_mysql.sh.in b/dbcon/mysql/install_mcs_mysql.sh.in index ff80a18cf..708843583 100755 --- a/dbcon/mysql/install_mcs_mysql.sh.in +++ b/dbcon/mysql/install_mcs_mysql.sh.in @@ -3,11 +3,11 @@ pwprompt=" " for arg in "$@"; do - if [ `expr -- "$arg" : '--tmpdir='` -eq 9 ]; then - tmpdir="`echo $arg | awk -F= '{print $2}'`" - else - echo "ignoring unknown argument: $arg" 1>&2 - fi + if [ $(expr -- "$arg" : '--tmpdir=') -eq 9 ]; then + tmpdir="$(echo $arg | awk -F= '{print $2}')" + else + echo "ignoring unknown argument: $arg" 1>&2 + fi done if [[ -f /etc/mysql/debian.cnf ]]; then @@ -17,7 +17,7 @@ else fi # DELETE libcalmysql.so entries first as they are in ha_columnstore.so in 1.4.2 onwards -$MDB 2> ${tmpdir}/mysql_install.log <${tmpdir}/mysql_install.log <schemaName(), simpleColumn->tableName()}; - auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); + auto columnStatistics = ctx.getGwi().findStatisticsForATable(schemaAndTableName); if (!columnStatistics) { continue; @@ -231,8 +231,8 @@ std::optional> chooseKeyC { cal_impl_if::SchemaAndTableName schemaAndTableName = {targetTable.schema, targetTable.table}; - auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); - if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end() || + auto tableColumnsStatisticsIt = ctx.getGwi().tableStatisticsMap.find(schemaAndTableName); + if (tableColumnsStatisticsIt == ctx.getGwi().tableStatisticsMap.end() || tableColumnsStatisticsIt->second.empty()) { return std::nullopt; @@ -281,7 +281,7 @@ std::optional> populateRangeBounds(Histogram_json_ }; // Get parallel factor from context - size_t maxParallelFactor = ctx.cesOptimizationParallelFactor; + size_t maxParallelFactor = ctx.getCesOptimizationParallelFactor(); std::cout << "populateRangeBounds() columnStatistics->buckets.size() " << columnStatistics->get_json_histogram().size() << std::endl; std::cout << "Session ces_optimization_parallel_factor: " << maxParallelFactor << std::endl; @@ -567,11 +567,11 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBO for (auto& table : tables) { cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table}; - auto anyColumnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); + auto anyColumnStatistics = ctx.getGwi().findStatisticsForATable(schemaAndTableName); if (!table.isColumnstore() && anyColumnStatistics) { std::string tableAlias = optimizer::RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + - "_" + std::to_string(ctx.uniqueId); + "_" + std::to_string(ctx.getUniqueId()); tableAliasToSCPositionsMap.insert({table, {tableAlias, {}, 0}}); execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, ""); newTableList.push_back(tn); diff --git a/dbcon/rbo/rbo_predicate_pushdown.cpp b/dbcon/rbo/rbo_predicate_pushdown.cpp index d98eace65..444c94328 100644 --- a/dbcon/rbo/rbo_predicate_pushdown.cpp +++ b/dbcon/rbo/rbo_predicate_pushdown.cpp @@ -167,7 +167,7 @@ bool applyPredicatePushdown(execplan::CalpontSelectExecutionPlan& csep, RBOptimi if (pt) { pt->walk(setDerivedTable); - setDerivedFilter(&ctx.gwi, pt, derivedTbFilterMap, derivedTbList); + setDerivedFilter(&ctx.getGwi(), pt, derivedTbFilterMap, derivedTbList); csep.filters(pt); } diff --git a/dbcon/rbo/rulebased_optimizer.cpp b/dbcon/rbo/rulebased_optimizer.cpp index 44e800ee0..eb17501e2 100644 --- a/dbcon/rbo/rulebased_optimizer.cpp +++ b/dbcon/rbo/rulebased_optimizer.cpp @@ -102,10 +102,15 @@ bool Rule::apply(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimi { changedThisRound = walk(root, ctx); hasBeenApplied |= changedThisRound; - if (ctx.logRules && changedThisRound) + if (ctx.logRulesEnabled() && changedThisRound) { std::cout << "MCS RBO: " << name << " has been applied this round." << std::endl; } + if (changedThisRound) + { + // Record rule application + ctx.addAppliedRule(name); + } } while (changedThisRound && !applyOnlyOnce); return hasBeenApplied; @@ -149,7 +154,7 @@ bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimiz if (mayApply(*current, ctx)) { rewrite |= applyRule(*current, ctx); - ++ctx.uniqueId; + ctx.incrementUniqueId(); } } diff --git a/dbcon/rbo/rulebased_optimizer.h b/dbcon/rbo/rulebased_optimizer.h index 97dbb112d..3c9f1b187 100644 --- a/dbcon/rbo/rulebased_optimizer.h +++ b/dbcon/rbo/rulebased_optimizer.h @@ -18,6 +18,7 @@ #pragma once #include +#include #define PREFER_MY_CONFIG_H #include @@ -35,16 +36,43 @@ class RBOptimizerContext public: RBOptimizerContext() = delete; RBOptimizerContext(cal_impl_if::gp_walk_info& walk_info, THD& thd, bool logRules, uint cesOptimizationParallelFactor = 50) - : gwi(walk_info), thd(thd), logRules(logRules), cesOptimizationParallelFactor(cesOptimizationParallelFactor) + : gwi_(walk_info), thd_(thd), logRules_(logRules), cesOptimizationParallelFactor_(cesOptimizationParallelFactor) { } + + // Accessors + cal_impl_if::gp_walk_info& getGwi() { return gwi_; } + THD& getThd() { return thd_; } + uint64_t getUniqueId() const { return uniqueId_; } + void incrementUniqueId() { ++uniqueId_; } + bool logRulesEnabled() const { return logRules_; } + uint getCesOptimizationParallelFactor() const { return cesOptimizationParallelFactor_; } + + // Applied rules API + void addAppliedRule(const std::string& name) { appliedRules_.push_back(name); } + const std::vector& getAppliedRules() const { return appliedRules_; } + bool hasAppliedRules() const { return !appliedRules_.empty(); } + std::string serializeAppliedRules() const + { + std::string out; + for (size_t i = 0; i < appliedRules_.size(); ++i) + { + if (i) out += ","; + out += appliedRules_[i]; + } + return out; + } + + private: // gwi lifetime should be longer than optimizer context. // In plugin runtime this is always true. - cal_impl_if::gp_walk_info& gwi; - THD& thd; - uint64_t uniqueId{0}; - bool logRules{false}; - uint cesOptimizationParallelFactor; + cal_impl_if::gp_walk_info& gwi_; + THD& thd_; + uint64_t uniqueId_{0}; + bool logRules_{false}; + uint cesOptimizationParallelFactor_; + // Names of rules that were actually applied in order + std::vector appliedRules_; }; struct Rule diff --git a/mysql-test/columnstore/include/drop_functions.inc b/mysql-test/columnstore/include/drop_functions.inc index a55af261c..8e2fdefce 100644 --- a/mysql-test/columnstore/include/drop_functions.inc +++ b/mysql-test/columnstore/include/drop_functions.inc @@ -60,4 +60,5 @@ DROP FUNCTION calenablepartitionsbyvalue; DROP FUNCTION calshowpartitionsbyvalue; DROP FUNCTION moda; DROP FUNCTION mcs_set_ddldebug_level; +DROP FUNCTION mcsgetplan; --enable_query_log diff --git a/mysql-test/columnstore/include/functions.inc b/mysql-test/columnstore/include/functions.inc index 9d9602aa3..807df8a5e 100644 --- a/mysql-test/columnstore/include/functions.inc +++ b/mysql-test/columnstore/include/functions.inc @@ -60,4 +60,6 @@ CREATE OR REPLACE FUNCTION calenablepartitionsbyvalue RETURNS STRING SONAME 'ha_ CREATE OR REPLACE FUNCTION calshowpartitionsbyvalue RETURNS STRING SONAME 'ha_columnstore.so'; CREATE OR REPLACE AGGREGATE FUNCTION moda RETURNS STRING SONAME 'libregr_mysql.so'; CREATE OR REPLACE FUNCTION mcs_set_ddldebug_level RETURNS STRING SONAME 'ha_columnstore.so'; +CREATE OR REPLACE FUNCTION mcsgetplan RETURNS STRING SONAME 'ha_columnstore.so'; + --enable_query_log