You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	MCOL-6145: mcsgetplan() UDF for CSEP printing
This commit is contained in:
		| @@ -924,6 +924,103 @@ extern "C" | ||||
|   { | ||||
|   } | ||||
|  | ||||
|   // --- Plan UDFs --- | ||||
|   // Return plan strings or applied rules based on optional parameter: | ||||
|   //   no arg or 1 -> optimized/latest plan (ci->queryPlan) | ||||
|   //   0 or 'original'  -> pre-RBO plan | ||||
|   //   1 or 'optimized' -> post-RBO plan | ||||
|   //   2 or 'rules'     -> comma-separated applied RBO rules | ||||
|   const char* mcsgetplan(UDF_INIT* /*initid*/, UDF_ARGS* args, char* /*result*/, unsigned long* length, | ||||
|                          char* is_null, char* /*error*/) | ||||
|   { | ||||
|     if (get_fe_conn_info_ptr() == NULL) | ||||
|     { | ||||
|       set_fe_conn_info_ptr((void*)new cal_connection_info()); | ||||
|       thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); | ||||
|     } | ||||
|  | ||||
|     cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); | ||||
|     static const std::string wrongArg("Wrong argument produced"); | ||||
|     const std::string* out = &wrongArg; | ||||
|     ; | ||||
|     if (args && args->arg_count >= 1 && args->args[0]) | ||||
|     { | ||||
|       if (args->arg_type[0] == INT_RESULT) | ||||
|       { | ||||
|         long long sel = *reinterpret_cast<long long*>(args->args[0]); | ||||
|         if (sel == 0) | ||||
|           out = &ci->queryPlanOriginal; | ||||
|         else if (sel == 1) | ||||
|           out = &ci->queryPlanOptimized; | ||||
|         else if (sel == 2) | ||||
|           out = &ci->rboAppliedRules; | ||||
|       } | ||||
|       else if (args->arg_type[0] == STRING_RESULT) | ||||
|       { | ||||
|         std::string key(args->args[0], args->lengths[0]); | ||||
|         // normalize to lower | ||||
|         for (auto& ch : key) | ||||
|           ch = (char)tolower((unsigned char)ch); | ||||
|         if (key == "original" || key == "orig" || key == "0") | ||||
|           out = &ci->queryPlanOriginal; | ||||
|         else if (key == "optimized" || key == "opt" || key == "1") | ||||
|           out = &ci->queryPlanOptimized; | ||||
|         else if (key == "rules" || key == "2") | ||||
|           out = &ci->rboAppliedRules; | ||||
|       } | ||||
|     } | ||||
|  | ||||
|     unsigned long l = out->size(); | ||||
|  | ||||
|     if (l == 0) | ||||
|     { | ||||
|       *is_null = 1; | ||||
|       return 0; | ||||
|     } | ||||
|  | ||||
|     if (l > TraceSize) | ||||
|       l = TraceSize; | ||||
|  | ||||
|     *length = l; | ||||
|     return out->c_str(); | ||||
|   } | ||||
|  | ||||
|   my_bool mcsgetplan_init(UDF_INIT* initid, UDF_ARGS* args, char* message) | ||||
|   { | ||||
|     if (args->arg_count > 1) | ||||
|     { | ||||
|       sprintf(message, "MCSGETPLAN() takes 0 or 1 argument: [original|optimized|rules|0|1|2]"); | ||||
|       return 1; | ||||
|     } | ||||
|     if (args->arg_count == 1 && !(args->arg_type[0] == INT_RESULT || args->arg_type[0] == STRING_RESULT)) | ||||
|     { | ||||
|       sprintf(message, "MCSGETPLAN() argument must be INT or STRING"); | ||||
|       return 1; | ||||
|     } | ||||
|     initid->maybe_null = 1; | ||||
|     initid->max_length = TraceSize; | ||||
|     return 0; | ||||
|   } | ||||
|  | ||||
|   void mcsgetplan_deinit(UDF_INIT* /*initid*/) | ||||
|   { | ||||
|   } | ||||
|  | ||||
|   const char* calgetplan(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length, char* is_null, | ||||
|                          char* error) | ||||
|   { | ||||
|     return mcsgetplan(initid, args, result, length, is_null, error); | ||||
|   } | ||||
|  | ||||
|   my_bool calgetplan_init(UDF_INIT* initid, UDF_ARGS* args, char* message) | ||||
|   { | ||||
|     return mcsgetplan_init(initid, args, message); | ||||
|   } | ||||
|  | ||||
|   void calgetplan_deinit(UDF_INIT* /*initid*/) | ||||
|   { | ||||
|   } | ||||
|  | ||||
|   my_bool getversion_init(UDF_INIT* /*initid*/, UDF_ARGS* args, char* message, const char* funcname) | ||||
|   { | ||||
|     if (args->arg_count != 0) | ||||
|   | ||||
| @@ -100,6 +100,42 @@ using namespace std; | ||||
|  | ||||
| namespace cal_impl_if | ||||
| { | ||||
| // Helper utilities to store plan strings and applied rules into cal_connection_info | ||||
|  | ||||
| enum class PlanType | ||||
| { | ||||
|   Original, | ||||
|   Optimized | ||||
| }; | ||||
|  | ||||
| static cal_connection_info* ensure_conn_info() | ||||
| { | ||||
|   if (get_fe_conn_info_ptr() == NULL) | ||||
|   { | ||||
|     set_fe_conn_info_ptr((void*)new cal_connection_info()); | ||||
|     thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); | ||||
|   } | ||||
|   return static_cast<cal_connection_info*>(get_fe_conn_info_ptr()); | ||||
| } | ||||
|  | ||||
| static void store_query_plan(execplan::SCSEP& csep, PlanType planType) | ||||
| { | ||||
|   cal_connection_info* ci = ensure_conn_info(); | ||||
|   switch (planType) | ||||
|   { | ||||
|     case PlanType::Original: ci->queryPlanOriginal = csep->toString(); break; | ||||
|     case PlanType::Optimized: ci->queryPlanOptimized = csep->toString(); break; | ||||
|     default: break; | ||||
|   } | ||||
| } | ||||
|  | ||||
|  | ||||
| static void store_applied_rules(const std::string rboRules) | ||||
| { | ||||
|   cal_connection_info* ci = ensure_conn_info(); | ||||
|   ci->rboAppliedRules = rboRules; | ||||
| } | ||||
|  | ||||
| // This is taken from Item_cond::fix_fields in sql/item_cmpfunc.cc. | ||||
| void calculateNotNullTables(const std::vector<COND*>& condList, table_map& not_null_tables) | ||||
| { | ||||
| @@ -7597,6 +7633,9 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& | ||||
|     cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; | ||||
|   } | ||||
|  | ||||
|   // Store original (pre-RBO) plan string for UDFs | ||||
|   store_query_plan(csep, PlanType::Original); | ||||
|  | ||||
|   // Derived table projection list optimization. | ||||
|   derivedTableOptimization(&gwi, csep); | ||||
|  | ||||
| @@ -7604,7 +7643,11 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& | ||||
|     optimizer::RBOptimizerContext ctx(gwi, *thd, csep->traceOn(), get_ces_optimization_parallel_factor(thd)); | ||||
|     // TODO RBO can crash or fail leaving CSEP in an invalid state, so there must be a valid CSEP copy | ||||
|     // TBD There is a tradeoff b/w copy per rule and copy per optimizer run. | ||||
|     bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx, get_unstable_optimizer(&ctx.thd)); | ||||
|     bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx, get_unstable_optimizer(&ctx.getThd())); | ||||
|  | ||||
|     // Store optimized plan and applied rules | ||||
|     store_query_plan(csep, PlanType::Optimized); | ||||
|     store_applied_rules(ctx.serializeAppliedRules()); | ||||
|     if (csep->traceOn() && csepWasOptimized) | ||||
|     { | ||||
|       cerr << "---------------- cs_get_select_plan optimized EXECUTION PLAN ----------------" << endl; | ||||
|   | ||||
| @@ -404,6 +404,9 @@ struct cal_connection_info | ||||
|   bool isCacheInsert; | ||||
|   std::string extendedStats; | ||||
|   std::string miniStats; | ||||
|   std::string queryPlanOriginal;     // CSEP string before RBO | ||||
|   std::string queryPlanOptimized;    // CSEP string after RBO | ||||
|   std::string rboAppliedRules;       // Comma-separated list of applied RBO rules | ||||
|   messageqcpp::MessageQueueClient* dmlProc; | ||||
|   ha_rows rowsHaveInserted; | ||||
|   ColNameList colNameList; | ||||
|   | ||||
| @@ -3,8 +3,8 @@ | ||||
| pwprompt=" " | ||||
|  | ||||
| for arg in "$@"; do | ||||
| 	if [ `expr -- "$arg" : '--tmpdir='` -eq 9 ]; then | ||||
| 		tmpdir="`echo $arg | awk -F= '{print $2}'`" | ||||
|   if [ $(expr -- "$arg" : '--tmpdir=') -eq 9 ]; then | ||||
|     tmpdir="$(echo $arg | awk -F= '{print $2}')" | ||||
|   else | ||||
|     echo "ignoring unknown argument: $arg" 1>&2 | ||||
|   fi | ||||
| @@ -17,7 +17,7 @@ else | ||||
| fi | ||||
|  | ||||
| # DELETE libcalmysql.so entries first as they are in ha_columnstore.so in 1.4.2 onwards | ||||
| $MDB 2> ${tmpdir}/mysql_install.log <<EOD | ||||
| $MDB 2>${tmpdir}/mysql_install.log <<EOD | ||||
| DELETE FROM mysql.func WHERE dl='libcalmysql.so'; | ||||
| CREATE OR REPLACE FUNCTION mcsgetstats RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
| CREATE OR REPLACE FUNCTION calgetstats RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
| @@ -80,6 +80,7 @@ CREATE OR REPLACE FUNCTION calenablepartitionsbyvalue RETURNS STRING SONAME 'ha_ | ||||
| CREATE OR REPLACE FUNCTION calshowpartitionsbyvalue RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
| CREATE OR REPLACE AGGREGATE FUNCTION moda RETURNS STRING SONAME 'libregr_mysql.so'; | ||||
| CREATE OR REPLACE FUNCTION mcs_set_ddldebug_level RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
| CREATE OR REPLACE FUNCTION mcsgetplan RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
|  | ||||
| CREATE DATABASE IF NOT EXISTS infinidb_querystats; | ||||
| CREATE TABLE IF NOT EXISTS infinidb_querystats.querystats | ||||
|   | ||||
| @@ -80,7 +80,7 @@ bool someForeignTablesHasStatisticsAndMbIndex(execplan::CalpontSelectExecutionPl | ||||
|       { | ||||
|         cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table}; | ||||
|         return (!table.isColumnstore() && | ||||
|                 ctx.gwi.tableStatisticsMap.find(schemaAndTableName) != ctx.gwi.tableStatisticsMap.end()); | ||||
|                 ctx.getGwi().tableStatisticsMap.find(schemaAndTableName) != ctx.getGwi().tableStatisticsMap.end()); | ||||
|       }); | ||||
| } | ||||
|  | ||||
| @@ -202,7 +202,7 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl | ||||
|       cal_impl_if::SchemaAndTableName schemaAndTableName = {simpleColumn->schemaName(), | ||||
|                                                             simpleColumn->tableName()}; | ||||
|  | ||||
|       auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); | ||||
|       auto columnStatistics = ctx.getGwi().findStatisticsForATable(schemaAndTableName); | ||||
|       if (!columnStatistics) | ||||
|       { | ||||
|         continue; | ||||
| @@ -231,8 +231,8 @@ std::optional<std::pair<execplan::SimpleColumn&, Histogram_json_hb*>> chooseKeyC | ||||
| { | ||||
|   cal_impl_if::SchemaAndTableName schemaAndTableName = {targetTable.schema, targetTable.table}; | ||||
|  | ||||
|   auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); | ||||
|   if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end() || | ||||
|   auto tableColumnsStatisticsIt = ctx.getGwi().tableStatisticsMap.find(schemaAndTableName); | ||||
|   if (tableColumnsStatisticsIt == ctx.getGwi().tableStatisticsMap.end() || | ||||
|       tableColumnsStatisticsIt->second.empty()) | ||||
|   { | ||||
|     return std::nullopt; | ||||
| @@ -281,7 +281,7 @@ std::optional<details::FilterRangeBounds<T>> populateRangeBounds(Histogram_json_ | ||||
|   }; | ||||
|  | ||||
|   // Get parallel factor from context | ||||
|   size_t maxParallelFactor = ctx.cesOptimizationParallelFactor; | ||||
|   size_t maxParallelFactor = ctx.getCesOptimizationParallelFactor(); | ||||
|   std::cout << "populateRangeBounds() columnStatistics->buckets.size() " | ||||
|             << columnStatistics->get_json_histogram().size() << std::endl; | ||||
|   std::cout << "Session ces_optimization_parallel_factor: " << maxParallelFactor << std::endl; | ||||
| @@ -567,11 +567,11 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBO | ||||
|   for (auto& table : tables) | ||||
|   { | ||||
|     cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table}; | ||||
|     auto anyColumnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); | ||||
|     auto anyColumnStatistics = ctx.getGwi().findStatisticsForATable(schemaAndTableName); | ||||
|     if (!table.isColumnstore() && anyColumnStatistics) | ||||
|     { | ||||
|       std::string tableAlias = optimizer::RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + | ||||
|                                "_" + std::to_string(ctx.uniqueId); | ||||
|                                "_" + std::to_string(ctx.getUniqueId()); | ||||
|       tableAliasToSCPositionsMap.insert({table, {tableAlias, {}, 0}}); | ||||
|       execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, ""); | ||||
|       newTableList.push_back(tn); | ||||
|   | ||||
| @@ -167,7 +167,7 @@ bool applyPredicatePushdown(execplan::CalpontSelectExecutionPlan& csep, RBOptimi | ||||
|    if (pt) | ||||
|    { | ||||
|      pt->walk(setDerivedTable); | ||||
|      setDerivedFilter(&ctx.gwi, pt, derivedTbFilterMap, derivedTbList); | ||||
|      setDerivedFilter(&ctx.getGwi(), pt, derivedTbFilterMap, derivedTbList); | ||||
|      csep.filters(pt); | ||||
|    } | ||||
|   | ||||
|   | ||||
| @@ -102,10 +102,15 @@ bool Rule::apply(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimi | ||||
|   { | ||||
|     changedThisRound = walk(root, ctx); | ||||
|     hasBeenApplied |= changedThisRound; | ||||
|     if (ctx.logRules && changedThisRound) | ||||
|     if (ctx.logRulesEnabled() && changedThisRound) | ||||
|     { | ||||
|       std::cout << "MCS RBO: " << name << " has been applied this round." << std::endl; | ||||
|     } | ||||
|     if (changedThisRound) | ||||
|     { | ||||
|       // Record rule application | ||||
|       ctx.addAppliedRule(name); | ||||
|     } | ||||
|   } while (changedThisRound && !applyOnlyOnce); | ||||
|  | ||||
|   return hasBeenApplied; | ||||
| @@ -149,7 +154,7 @@ bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimiz | ||||
|     if (mayApply(*current, ctx)) | ||||
|     { | ||||
|       rewrite |= applyRule(*current, ctx); | ||||
|       ++ctx.uniqueId; | ||||
|       ctx.incrementUniqueId(); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   | ||||
| @@ -18,6 +18,7 @@ | ||||
| #pragma once | ||||
|  | ||||
| #include <string> | ||||
| #include <vector> | ||||
|  | ||||
| #define PREFER_MY_CONFIG_H | ||||
| #include <my_config.h> | ||||
| @@ -35,16 +36,43 @@ class RBOptimizerContext | ||||
|  public: | ||||
|   RBOptimizerContext() = delete; | ||||
|   RBOptimizerContext(cal_impl_if::gp_walk_info& walk_info, THD& thd, bool logRules, uint cesOptimizationParallelFactor = 50) | ||||
|    : gwi(walk_info), thd(thd), logRules(logRules), cesOptimizationParallelFactor(cesOptimizationParallelFactor) | ||||
|    : gwi_(walk_info), thd_(thd), logRules_(logRules), cesOptimizationParallelFactor_(cesOptimizationParallelFactor) | ||||
|   { | ||||
|   } | ||||
|  | ||||
|   // Accessors | ||||
|   cal_impl_if::gp_walk_info& getGwi() { return gwi_; } | ||||
|   THD& getThd() { return thd_; } | ||||
|   uint64_t getUniqueId() const { return uniqueId_; } | ||||
|   void incrementUniqueId() { ++uniqueId_; } | ||||
|   bool logRulesEnabled() const { return logRules_; } | ||||
|   uint getCesOptimizationParallelFactor() const { return cesOptimizationParallelFactor_; } | ||||
|  | ||||
|   // Applied rules API | ||||
|   void addAppliedRule(const std::string& name) { appliedRules_.push_back(name); } | ||||
|   const std::vector<std::string>& getAppliedRules() const { return appliedRules_; } | ||||
|   bool hasAppliedRules() const { return !appliedRules_.empty(); } | ||||
|   std::string serializeAppliedRules() const | ||||
|   { | ||||
|     std::string out; | ||||
|     for (size_t i = 0; i < appliedRules_.size(); ++i) | ||||
|     { | ||||
|       if (i) out += ","; | ||||
|       out += appliedRules_[i]; | ||||
|     } | ||||
|     return out; | ||||
|   } | ||||
|  | ||||
|  private: | ||||
|   // gwi lifetime should be longer than optimizer context. | ||||
|   // In plugin runtime this is always true. | ||||
|   cal_impl_if::gp_walk_info& gwi; | ||||
|   THD& thd; | ||||
|   uint64_t uniqueId{0}; | ||||
|   bool logRules{false}; | ||||
|   uint cesOptimizationParallelFactor; | ||||
|   cal_impl_if::gp_walk_info& gwi_; | ||||
|   THD& thd_; | ||||
|   uint64_t uniqueId_{0}; | ||||
|   bool logRules_{false}; | ||||
|   uint cesOptimizationParallelFactor_; | ||||
|   // Names of rules that were actually applied in order | ||||
|   std::vector<std::string> appliedRules_; | ||||
| }; | ||||
|  | ||||
| struct Rule | ||||
|   | ||||
| @@ -60,4 +60,5 @@ DROP FUNCTION calenablepartitionsbyvalue; | ||||
| DROP FUNCTION calshowpartitionsbyvalue; | ||||
| DROP FUNCTION moda; | ||||
| DROP FUNCTION mcs_set_ddldebug_level; | ||||
| DROP FUNCTION mcsgetplan; | ||||
| --enable_query_log | ||||
|   | ||||
| @@ -60,4 +60,6 @@ CREATE OR REPLACE FUNCTION calenablepartitionsbyvalue RETURNS STRING SONAME 'ha_ | ||||
| CREATE OR REPLACE FUNCTION calshowpartitionsbyvalue RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
| CREATE OR REPLACE AGGREGATE FUNCTION moda RETURNS STRING SONAME 'libregr_mysql.so'; | ||||
| CREATE OR REPLACE FUNCTION mcs_set_ddldebug_level RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
| CREATE OR REPLACE FUNCTION mcsgetplan RETURNS STRING SONAME 'ha_columnstore.so'; | ||||
|  | ||||
| --enable_query_log | ||||
|   | ||||
		Reference in New Issue
	
	Block a user