From 2c6325853760ec198c645469d7211add69629fee Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Sun, 25 Aug 2019 04:05:59 +0300 Subject: [PATCH] MCOL-2178 SH now allows to fallback to other pushdown handlers. SH query execution migrated from SH::init() into create_SH(). There is a session variable columnstore_processing_handlers_fallback that allows to fallback to DH, GBH if SH fails. DH now uses semantic tree check for unsupported features to allow to fallback to GBH or storage API. Fixes GBH related bug when create_GBH() returns a handler for queries with impossible WHERE/HAVING. Fixed bug in FromSubquery::transform() where isUnion is set to true. Enabled RTTI b/c server team enabled it for MDB. Removed unused code supposed to be used with vtable. --- dbcon/mysql/CMakeLists.txt | 2 +- dbcon/mysql/ha_calpont_execplan.cpp | 13 +- dbcon/mysql/ha_calpont_impl.cpp | 169 +++++++++++- dbcon/mysql/ha_calpont_impl.h | 1 + dbcon/mysql/ha_from_sub.cpp | 4 +- dbcon/mysql/ha_mcs_pushdown.cpp | 406 +++++++++++++++++++--------- dbcon/mysql/ha_mcs_sysvars.cpp | 34 ++- dbcon/mysql/ha_mcs_sysvars.h | 3 + dbcon/mysql/ha_view.cpp | 2 - dbcon/mysql/sm.cpp | 2 +- 10 files changed, 469 insertions(+), 167 deletions(-) diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index c74e19343..8ee9c0d6b 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -23,7 +23,7 @@ SET ( libcalmysql_SRCS add_definitions(-DMYSQL_DYNAMIC_PLUGIN) -set_source_files_properties(ha_calpont.cpp PROPERTIES COMPILE_FLAGS "-fno-rtti -fno-implicit-templates") +set_source_files_properties(ha_calpont.cpp PROPERTIES COMPILE_FLAGS "-fno-implicit-templates") add_library(calmysql SHARED ${libcalmysql_SRCS}) diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index a91b74de5..ff3a7ec85 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -5893,8 +5893,6 @@ void parse_item (Item* item, vector& field_vec, Item_field* ifp = reinterpret_cast(*(ref->ref)); field_vec.push_back(ifp); } - else - hasNonSupportItem = true; break; } else if ((*(ref->ref))->type() == Item::FUNC_ITEM) @@ -5957,7 +5955,7 @@ void parse_item (Item* item, vector& field_vec, case Item::EXPR_CACHE_ITEM: { // item is a Item_cache_wrapper. Shouldn't get here. - // WIP Why + // DRRTUY TODO Why IDEBUG(std::cerr << "EXPR_CACHE_ITEM in parse_item\n" << std::endl); gwi->fatalParseError = true; // DRRTUY The questionable error text. I've seen @@ -6107,10 +6105,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, { for (; table_ptr; table_ptr = table_ptr->next_local) { - // mysql put vtable here for from sub. we ignore it - if (string(table_ptr->table_name.str).find("$vtable") != string::npos) - continue; - // Until we handle recursive cte: // Checking here ensures we catch all with clauses in the query. if (table_ptr->is_recursive_with_table()) @@ -6279,9 +6273,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, csep->unionVec(unionVec); csep->distinctUnionNum(distUnionNum); - - if (unionVec.empty()) - gwi.cs_vtable_impossible_where_on_union = true; } gwi.clauseType = WHERE; @@ -6307,7 +6298,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, Item_equal *cur_item_eq; while ((cur_item_eq= li++)) { - // WIP replace the block with + // DRRTUY TODO replace the block with //cur_item_eq->traverse_cond(debug_walk, gwip, Item::POSTFIX); std::cerr << "item_equal("; Item *item; diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 7892f7c73..ed824da4e 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -2940,7 +2940,6 @@ int ha_calpont_impl_delete_table(const char* name) int ha_calpont_impl_write_row(const uchar* buf, TABLE* table) { THD* thd = current_thd; - //sleep(100); // Error out INSERT on VIEW. It's currently not supported. // @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off // for now - ZZ. @@ -4846,6 +4845,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) gp_walk_info gwi; gwi.thd = thd; + bool err = false; //check whether the system is ready to process statement. #ifndef _MSC_VER @@ -5024,12 +5024,6 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) if (status != 0) goto internal_error; - // @bug 2547. don't need to send the plan if it's impossible where for all unions. - if (gwi.cs_vtable_impossible_where_on_union) - { - return 0; - } - string query; query.assign(idb_mysql_query_str(thd)); csep->data(query); @@ -5103,7 +5097,6 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) string emsgStr; emsgBs >> emsgStr; - bool err = false; if (msg.length() == 4) { @@ -5137,6 +5130,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) if (err) { + // CS resets error in create_SH() if fallback is enabled setError(thd, ER_INTERNAL_ERROR, emsgStr); return ER_INTERNAL_ERROR; } @@ -5190,6 +5184,9 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) ti.msTablePtr = dh->table; } + // For SH CS creates SM environment inside select_next(). + // This allows us to try and fail with SH. + if (!sh) { if (ti.tpl_ctx == 0) { @@ -5245,9 +5242,9 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) ti.tpl_scan_ctx->ctp.push_back(ctype); } } + ci->tableMap[table] = ti; } - ci->tableMap[table] = ti; return 0; error: @@ -5271,4 +5268,158 @@ internal_error: return ER_INTERNAL_ERROR; } + +int ha_cs_impl_select_next(uchar* buf, TABLE* table) +{ + int rc = HA_ERR_END_OF_FILE; + THD* thd = current_thd; + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + if (thd->slave_thread && !ci->replicationEnabled && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + + if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || + ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) + return rc; + + // @bug 2547 + // MCOL-2178 This variable can never be true in the scope of this function + // if (MIGR::infinidb_vtable.impossibleWhereOnUnion) + // return HA_ERR_END_OF_FILE; + + // @bug 3078 + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) + { + force_close_fep_conn(thd, ci); + return 0; + } + + if (ci->alterTableState > 0) return rc; + + cal_table_info ti; + ti= ci->tableMap[table]; + // This is the server's temp table for the result. + ti.msTablePtr= table; + sm::tableid_t tableid= execplan::IDB_VTABLE_ID; + sm::cpsm_conhdl_t* hndl= ci->cal_conn_hndl; + + if (!ti.tpl_ctx || !ti.tpl_scan_ctx || (hndl && hndl->queryState == sm::NO_QUERY)) + { + if (ti.tpl_ctx == 0) + { + ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); + } + + // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql + // call rnd_init for a table more than once. + ti.tpl_scan_ctx->rowGroup = NULL; + + try + { + sm::tpl_open(tableid, ti.tpl_ctx, hndl); + sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl); + } + catch (std::exception& e) + { + uint32_t sessionID = tid2sid(thd->thread_id); + string emsg = "table can not be opened: " + string(e.what()); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + catch (...) + { + uint32_t sessionID = tid2sid(thd->thread_id); + string emsg = "table can not be opened"; + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + ti.tpl_scan_ctx->traceFlags = ci->traceFlags; + + if ((ti.tpl_scan_ctx->ctp).size() == 0) + { + uint32_t num_attr = table->s->fields; + + for (uint32_t i = 0; i < num_attr; i++) + { + CalpontSystemCatalog::ColType ctype; + ti.tpl_scan_ctx->ctp.push_back(ctype); + } + } + ci->tableMap[table] = ti; + } + + if (!ti.tpl_ctx || !ti.tpl_scan_ctx) + { + uint32_t sessionID = tid2sid(thd->thread_id); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + return ER_INTERNAL_ERROR; + } + + idbassert(ti.msTablePtr == table); + + try + { + rc = fetchNextRow(buf, ti, ci); + } + catch (std::exception& e) + { + uint32_t sessionID = tid2sid(thd->thread_id); + string emsg = string("Error while fetching from ExeMgr: ") + e.what(); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + return ER_INTERNAL_ERROR; + } + + ci->tableMap[table]= ti; + + if (rc != 0 && rc != HA_ERR_END_OF_FILE) + { + string emsg; + + // remove this check when all error handling migrated to the new framework. + if (rc >= 1000) + emsg = ti.tpl_scan_ctx->errMsg; + else + { + logging::ErrorCodes errorcodes; + emsg = errorcodes.errorString(rc); + } + + uint32_t sessionID = tid2sid(thd->thread_id); + setError(thd, ER_INTERNAL_ERROR, emsg); + ci->stats.fErrorNo = rc; + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + rc = ER_INTERNAL_ERROR; + } + + return rc; + +internal_error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return ER_INTERNAL_ERROR; + +} + // vim:sw=4 ts=4: diff --git a/dbcon/mysql/ha_calpont_impl.h b/dbcon/mysql/ha_calpont_impl.h index ee96876db..997544905 100644 --- a/dbcon/mysql/ha_calpont_impl.h +++ b/dbcon/mysql/ha_calpont_impl.h @@ -47,6 +47,7 @@ extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info , TABLE* table); +extern int ha_cs_impl_select_next(uchar *buf, TABLE *table); #endif diff --git a/dbcon/mysql/ha_from_sub.cpp b/dbcon/mysql/ha_from_sub.cpp index 0093330aa..014ded75e 100644 --- a/dbcon/mysql/ha_from_sub.cpp +++ b/dbcon/mysql/ha_from_sub.cpp @@ -349,7 +349,9 @@ SCSEP FromSubQuery::transform() csep->derivedTbAlias(fAlias); // always lower case csep->derivedTbView(fGwip.viewName.alias); - if (getSelectPlan(gwi, *fFromSub, csep, fPushdownHand) != 0) + // DRRTUY isUnion - false. fPushdownHand could be safely set to true + // b/c only pushdowns get here. + if (getSelectPlan(gwi, *fFromSub, csep, false, fPushdownHand) != 0) { fGwip.fatalParseError = true; diff --git a/dbcon/mysql/ha_mcs_pushdown.cpp b/dbcon/mysql/ha_mcs_pushdown.cpp index 713b70145..bc9f26d00 100644 --- a/dbcon/mysql/ha_mcs_pushdown.cpp +++ b/dbcon/mysql/ha_mcs_pushdown.cpp @@ -14,12 +14,13 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - // ha_calpont.cpp includes this file. +void check_walk(const Item* item, void* arg); + void mutate_optimizer_flags(THD *thd_) { - // MCOL-2178 Disable all optimizer flags as it was in the fork. + // MCOL-2178 Disable all optimizer flags as it was in the fork. // CS restores it later in SH::scan_end() and in case of an error // in SH::scan_init() set_original_optimizer_flags(thd_->variables.optimizer_switch, thd_); @@ -38,65 +39,196 @@ void restore_optimizer_flags(THD *thd_) set_original_optimizer_flags(0, thd_); } } - -/*@brief check_walk - It traverses filter conditions*/ -/************************************************************ - * DESCRIPTION: - * It traverses filter predicates looking for unsupported - * JOIN types: non-equi JOIN, e.g t1.c1 > t2.c2; - * logical OR. - * PARAMETERS: - * thd - THD pointer. - * derived - TABLE_LIST* to work with. - * RETURN: - * derived_handler if possible - * NULL in other case - ***********************************************************/ -void check_walk(const Item* item, void* arg) +/*@brief find_tables - This traverses Item */ +/********************************************************** +* DESCRIPTION: +* This f() pushes TABLE of an Item_field to a list +* provided. The list is used for JOIN predicate check in +* is_joinkeys_predicate(). +* PARAMETERS: +* Item * Item to check +* RETURN: +***********************************************************/ +void find_tables(const Item* item, void* arg) { - bool* unsupported_feature = static_cast(arg); + if (typeid(*item) == typeid(Item_field)) + { + Item_field *ifp= (Item_field*)item; + List *tables_list= (List
*)arg; + tables_list->push_back(ifp->field->table); + } +} + +/*@brief is_joinkeys_predicate - This traverses Item_func*/ +/*********************************************************** +* DESCRIPTION: +* This f() walks Item_func and checks whether it contains +* JOIN predicate +* PARAMETERS: +* Item_func * Item to walk +* RETURN: +* BOOL false if Item_func isn't a JOIN predicate +* BOOL true otherwise +***********************************************************/ +bool is_joinkeys_predicate(const Item_func *ifp) +{ + bool result = false; + if(ifp->argument_count() == 2) + { + if (ifp->arguments()[0]->type() == Item::FIELD_ITEM && + ifp->arguments()[1]->type() == Item::FIELD_ITEM) + { + Item_field* left= reinterpret_cast(ifp->arguments()[0]); + Item_field* right= reinterpret_cast(ifp->arguments()[1]); + + // If MDB crashes here with non-fixed Item_field and field == NULL + // there must be a check over on_expr for a different SELECT_LEX. + // e.g. checking subquery with ON from upper query. + if (left->field->table != right->field->table) + { + result= true; + } + } + else + { + List
llt; List
rlt; + Item *left= ifp->arguments()[0]; + Item *right= ifp->arguments()[1]; + // Search for tables inside left and right expressions + // and compare them + left->traverse_cond(find_tables, (void*)&llt, Item::POSTFIX); + right->traverse_cond(find_tables, (void*)&rlt, Item::POSTFIX); + // TODO Find the way to have more then one element or prove + // the idea is useless. + if (llt.elements && rlt.elements && (llt.elem(0) != rlt.elem(0))) + { + result= true; + } + } + } + return result; +} + +/*@brief find_nonequi_join - This traverses Item */ +/************************************************************ +* DESCRIPTION: +* This f() walks Item and looks for a non-equi join +* predicates +* PARAMETERS: +* Item * Item to walk +* RETURN: +***********************************************************/ +void find_nonequi_join(const Item* item, void *arg) +{ + bool *unsupported_feature = reinterpret_cast(arg); if ( *unsupported_feature ) return; + + if (item->type() == Item::FUNC_ITEM) + { + const Item_func* ifp = reinterpret_cast(item); + //TODO Check for IN + //NOT IN + correlated subquery + if (ifp->functype() != Item_func::EQ_FUNC) + { + if (is_joinkeys_predicate(ifp)) + *unsupported_feature = true; + else if (ifp->functype() == Item_func::NOT_FUNC + && ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM) + { + check_walk(ifp->arguments()[0], arg); + } + } + } +} + +/*@brief find_join - This traverses Item */ +/************************************************************ +* DESCRIPTION: +* This f() walks traverses Item looking for JOIN, SEMI-JOIN +* predicates. +* PARAMETERS: +* Item * Item to traverse +* RETURN: +***********************************************************/ +void find_join(const Item* item, void* arg) +{ + bool *unsupported_feature = reinterpret_cast(arg); + if ( *unsupported_feature ) + return; + + if (item->type() == Item::FUNC_ITEM) + { + const Item_func* ifp = reinterpret_cast(item); + //TODO Check for IN + //NOT IN + correlated subquery + { + if (is_joinkeys_predicate(ifp)) + *unsupported_feature = true; + else if (ifp->functype() == Item_func::NOT_FUNC + && ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM) + { + check_walk(ifp->arguments()[0], arg); + } + } + } +} + +/*@brief save_join_predicate - This traverses Item */ +/************************************************************ +* DESCRIPTION: +* This f() walks Item and saves found JOIN predicates into +* a List. The list will be used for a simple CROSS JOIN +* check in create_DH. +* PARAMETERS: +* Item * Item to walk +* RETURN: +***********************************************************/ +void save_join_predicates(const Item* item, void* arg) +{ + if (item->type() == Item::FUNC_ITEM) + { + const Item_func* ifp= reinterpret_cast(item); + if (is_joinkeys_predicate(ifp)) + { + List *join_preds_list= (List*)arg; + join_preds_list->push_back(const_cast(item)); + } + } +} + + +/*@brief check_walk - It traverses filter conditions */ +/************************************************************ +* DESCRIPTION: +* It traverses filter predicates looking for unsupported +* JOIN types: non-equi JOIN, e.g t1.c1 > t2.c2; +* logical OR. +* PARAMETERS: +* thd - THD pointer. +* derived - TABLE_LIST* to work with. +* RETURN: +***********************************************************/ +void check_walk(const Item* item, void* arg) +{ + bool *unsupported_feature = reinterpret_cast(arg); + if ( *unsupported_feature ) + return; + switch (item->type()) { case Item::FUNC_ITEM: { - const Item_func* ifp = static_cast(item); - - if ( ifp->functype() != Item_func::EQ_FUNC ) // NON-equi JOIN - { - if ( ifp->argument_count() == 2 && - ifp->arguments()[0]->type() == Item::FIELD_ITEM && - ifp->arguments()[1]->type() == Item::FIELD_ITEM ) - { - Item_field* left= static_cast(ifp->arguments()[0]); - Item_field* right= static_cast(ifp->arguments()[1]); - - if ( left->field->table != right->field->table ) - { - *unsupported_feature = true; - return; - } - } - else // IN + correlated subquery - { - if ( ifp->functype() == Item_func::NOT_FUNC - && ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM ) - { - check_walk(ifp->arguments()[0], arg); - } - } - } + find_nonequi_join(item, arg); break; } - case Item::EXPR_CACHE_ITEM: // IN + correlated subquery { - const Item_cache_wrapper* icw = static_cast(item); + const Item_cache_wrapper* icw = reinterpret_cast(item); if ( icw->get_orig_item()->type() == Item::FUNC_ITEM ) { - const Item_func *ifp = static_cast(icw->get_orig_item()); + const Item_func *ifp = reinterpret_cast(icw->get_orig_item()); if ( ifp->argument_count() == 2 && ( ifp->arguments()[0]->type() == Item::Item::SUBSELECT_ITEM || ifp->arguments()[1]->type() == Item::Item::SUBSELECT_ITEM )) @@ -107,13 +239,24 @@ void check_walk(const Item* item, void* arg) } break; } - - case Item::COND_ITEM: // OR in JOIN conds is unsupported yet + case Item::COND_ITEM: // OR contains JOIN conds thats is unsupported yet { Item_cond* icp = (Item_cond*)item; if ( is_cond_or(icp) ) { - *unsupported_feature = true; + bool left_flag= false, right_flag= false; + if (icp->argument_list()->elements >= 2) + { + Item *left; Item *right; + List_iterator li(*icp->argument_list()); + left = li++; right = li++; + left->traverse_cond(find_join, (void*)&left_flag, Item::POSTFIX); + right->traverse_cond(find_join, (void*)&right_flag, Item::POSTFIX); + if (left_flag && right_flag) + { + *unsupported_feature = true; + } + } } break; } @@ -172,9 +315,9 @@ create_calpont_group_by_handler(THD* thd, Query* query) unsupported_feature = select_lex->is_correlated; // Impossible HAVING or WHERE - if ( ( !unsupported_feature && select_lex->having_value == Item::COND_FALSE ) - || ( select_lex->cond_count > 0 - && select_lex->cond_value == Item::COND_FALSE ) ) + if (!unsupported_feature && + (select_lex->having_value == Item::COND_FALSE + || select_lex->cond_value == Item::COND_FALSE )) { unsupported_feature = true; } @@ -188,8 +331,7 @@ create_calpont_group_by_handler(THD* thd, Query* query) if (join != 0) icp = reinterpret_cast(join->conds); - if ( unsupported_feature == false - && icp ) + if (unsupported_feature == false && icp) { icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); } @@ -198,16 +340,14 @@ create_calpont_group_by_handler(THD* thd, Query* query) if (select_lex->where != 0) icp = reinterpret_cast(select_lex->where); - if ( unsupported_feature == false - && icp ) + if (unsupported_feature == false && icp) { icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); } - } } // unsupported features check ends here - if ( !unsupported_feature ) + if (!unsupported_feature) { handler = new ha_calpont_group_by_handler(thd, query); @@ -248,25 +388,70 @@ create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived) } SELECT_LEX_UNIT *unit= derived->derived; + SELECT_LEX *sl= unit->first_select(); bool unsupported_feature = false; + // Select_handler use the short-cut that effectively disables + // INSERT..SELECT and LDI + if ( (thd->lex)->sql_command == SQLCOM_INSERT_SELECT + || (thd->lex)->sql_command == SQLCOM_CREATE_TABLE ) { - SELECT_LEX select_lex = *unit->first_select(); - JOIN* join = select_lex.join; - Item_cond* icp = 0; + unsupported_feature = true; + } - if (join != 0) - icp = reinterpret_cast(join->conds); + // Impossible HAVING or WHERE + // TODO replace with function call + if ( unsupported_feature + || sl->having_value == Item::COND_FALSE + || sl->cond_value == Item::COND_FALSE ) + { + unsupported_feature = true; + } - if (!join) + // JOIN expression from WHERE, ON expressions + JOIN* join= sl->join; + //TODO DRRTUY Make a proper tree traverse + //To search for CROSS JOIN-s we use tree invariant + //G(V,E) where [V] = [E]+1 + List join_preds_list; + TABLE_LIST *tl; + for (tl= sl->get_table_list(); tl; tl= tl->next_local) + { + Item_cond* where_icp= 0; + Item_cond* on_icp= 0; + if (tl->where != 0) { - icp = reinterpret_cast(select_lex.where); + where_icp= reinterpret_cast(tl->where); } - if ( icp ) + if (where_icp) { - //icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + where_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + where_icp->traverse_cond(save_join_predicates, &join_preds_list, Item::POSTFIX); } + + // Looking for JOIN with ON expression through + // TABLE_LIST in FROM until CS meets unsupported feature + if (tl->on_expr) + { + on_icp= reinterpret_cast(tl->on_expr); + on_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + on_icp->traverse_cond(save_join_predicates, &join_preds_list, Item::POSTFIX); + } + } + + // CROSS JOIN w/o conditions isn't supported until MCOL-301 + // is ready. + if (join && join->table_count >= 2 && !join_preds_list.elements) + { + unsupported_feature= true; + } + + // CROSS JOIN with not enough JOIN predicates + if(!unsupported_feature && join + && join_preds_list.elements < join->table_count-1) + { + unsupported_feature= true; } if ( !unsupported_feature ) @@ -307,16 +492,9 @@ ha_columnstore_derived_handler::~ha_columnstore_derived_handler() ***********************************************************/ int ha_columnstore_derived_handler::init_scan() { - char query_buff[4096]; - DBUG_ENTER("ha_columnstore_derived_handler::init_scan"); - // Save query for logging - String derived_query(query_buff, sizeof(query_buff), thd->charset()); - derived_query.length(0); - derived->derived->print(&derived_query, QT_ORDINARY); - - mcs_handler_info mhi = mcs_handler_info(static_cast(this), DERIVED); + mcs_handler_info mhi = mcs_handler_info(reinterpret_cast(this), DERIVED); // this::table is the place for the result set int rc = ha_cs_impl_pushdown_init(&mhi, table); @@ -465,7 +643,7 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex) bool unsupported_feature = false; // Select_handler use the short-cut that effectively disables // INSERT..SELECT and LDI - if ( (thd->lex)->sql_command == SQLCOM_INSERT_SELECT + if ( (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || (thd->lex)->sql_command == SQLCOM_CREATE_TABLE ) { @@ -481,51 +659,30 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex) unsupported_feature = true; } - // Unsupported query check. - if ( !unsupported_feature ) - { - // JOIN expression from WHERE, ON expressions - JOIN* join = select_lex->join; - Item_cond* where_icp = 0; - Item_cond* on_icp = 0; - - if (join != 0) - { - where_icp = reinterpret_cast(join->conds); - } - - if ( where_icp ) - { - //where_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); - } - - // Looking for JOIN with ON expression through - // TABLE_LIST in FROM until CS meets unsupported feature - TABLE_LIST* table_ptr = select_lex->get_table_list(); - for (; !unsupported_feature && table_ptr; table_ptr = table_ptr->next_global) - { - if(table_ptr->on_expr) - { - on_icp = reinterpret_cast(table_ptr->on_expr); - //on_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); - } - } - - // CROSS JOIN w/o conditions isn't supported until MCOL-301 - // is ready. - if (join && join->table_count >= 2 && ( !where_icp && !on_icp )) - { - unsupported_feature = true; - } - } - + // Next block tries to execute the query using SH very early to fallback + // if execution fails. if (!unsupported_feature) { - handler = new ha_columnstore_select_handler(thd, select_lex); + handler= new ha_columnstore_select_handler(thd, select_lex); mutate_optimizer_flags(thd); + mcs_handler_info mhi= mcs_handler_info(reinterpret_cast(handler), SELECT); + // this::table is the place for the result set + int rc= ha_cs_impl_pushdown_init(&mhi, handler->table); + + // Return SH if query execution is fine or fallback is disabled + if (!rc || !get_fallback_knob(thd)) + return handler; + + // Reset the DA and restore optimizer flags + // to allow query to fallback to other handlers + if (thd->get_stmt_da()->is_error()) + { + thd->get_stmt_da()->reset_diagnostics_area(); + restore_optimizer_flags(thd); + } } - return handler; + return NULL; } /*********************************************************** @@ -561,18 +718,11 @@ ha_columnstore_select_handler::~ha_columnstore_select_handler() ***********************************************************/ int ha_columnstore_select_handler::init_scan() { - char query_buff[4096]; - DBUG_ENTER("ha_columnstore_select_handler::init_scan"); - // Save query for logging - String select_query(query_buff, sizeof(query_buff), thd->charset()); - select_query.length(0); - select->print(thd, &select_query, QT_ORDINARY); - - mcs_handler_info mhi = mcs_handler_info(static_cast(this), SELECT); - // this::table is the place for the result set - int rc = ha_cs_impl_pushdown_init(&mhi, table); + // Dummy init for SH. Actual init happens in create_SH + // to allow fallback to other handlers if SH fails. + int rc = 0; DBUG_RETURN(rc); } @@ -591,7 +741,7 @@ int ha_columnstore_select_handler::next_row() { DBUG_ENTER("ha_columnstore_select_handler::next_row"); - int rc = ha_calpont_impl_rnd_next(table->record[0], table); + int rc= ha_cs_impl_select_next(table->record[0], table); DBUG_RETURN(rc); } diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index b0fc0bb0b..4c5f0b101 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -98,7 +98,14 @@ static MYSQL_THDVAR_BOOL( 1 ); - +static MYSQL_THDVAR_BOOL( + processing_handlers_fallback, + PLUGIN_VAR_NOCMDARG, + "Enable/Disable the unsupported features check in handlers.", + NULL, + NULL, + 0 +); // legacy system variables static MYSQL_THDVAR_ULONG( @@ -269,15 +276,6 @@ static MYSQL_THDVAR_BOOL( 1 // default ); -static MYSQL_THDVAR_BOOL( - use_legacy_sysvars, - PLUGIN_VAR_NOCMDARG, - "Control CS behavior using legacy * sysvars", - NULL, // check - NULL, // update - 0 // default -); - st_mysql_sys_var* mcs_system_variables[] = { MYSQL_SYSVAR(compression_type), @@ -285,6 +283,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(original_optimizer_flags), MYSQL_SYSVAR(select_handler), MYSQL_SYSVAR(derived_handler), + MYSQL_SYSVAR(processing_handlers_fallback), MYSQL_SYSVAR(group_by_handler), MYSQL_SYSVAR(decimal_scale), MYSQL_SYSVAR(use_decimal_scale), @@ -300,7 +299,6 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(use_import_for_batchinsert), MYSQL_SYSVAR(import_for_batchinsert_delimiter), MYSQL_SYSVAR(import_for_batchinsert_enclosed_by), - MYSQL_SYSVAR(use_legacy_sysvars), MYSQL_SYSVAR(varbin_always_hex), NULL }; @@ -323,8 +321,7 @@ void set_fe_conn_info_ptr(void* ptr, THD* thd) ulonglong get_original_optimizer_flags(THD* thd) { - return ( current_thd == NULL && thd == NULL ) ? NULL : - THDVAR(current_thd, original_optimizer_flags); + return THDVAR(current_thd, original_optimizer_flags); } void set_original_optimizer_flags(ulonglong ptr, THD* thd) @@ -364,7 +361,16 @@ void set_group_by_handler(THD* thd, bool value) THDVAR(thd, group_by_handler) = value; } -void set_compression_type(THD* thd, ulong value) +bool get_fallback_knob(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, processing_handlers_fallback); +} +void set_fallback_knob(THD* thd, bool value) +{ + THDVAR(thd, processing_handlers_fallback) = value; +} + + void set_compression_type(THD* thd, ulong value) { THDVAR(thd, compression_type) = value; } diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 38ed294cf..cf3f06822 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -52,6 +52,9 @@ void set_derived_handler(THD* thd, bool value); bool get_group_by_handler(THD* thd); void set_group_by_handler(THD* thd, bool value); +bool get_fallback_knob(THD* thd); +void set_fallback_knob(THD* thd, bool value); + bool get_use_decimal_scale(THD* thd); void set_use_decimal_scale(THD* thd, bool value); diff --git a/dbcon/mysql/ha_view.cpp b/dbcon/mysql/ha_view.cpp index 65707e4c7..2cba59b09 100644 --- a/dbcon/mysql/ha_view.cpp +++ b/dbcon/mysql/ha_view.cpp @@ -110,8 +110,6 @@ void View::transform() // for nested view, the view name is vout.vin... format CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db.str, table_ptr->table_name.str, table_ptr->alias.str, viewName); gwi.viewName = make_aliastable(table_ptr->db.str, table_ptr->table_name.str, viewName); - // WIP MCOL-2178 CS could mess with the SELECT_LEX unit so better - // use a copy. View* view = new View(*table_ptr->view->first_select_lex(), &gwi); view->viewName(gwi.viewName); gwi.viewList.push_back(view); diff --git a/dbcon/mysql/sm.cpp b/dbcon/mysql/sm.cpp index e3dca6a56..a7e476f6f 100644 --- a/dbcon/mysql/sm.cpp +++ b/dbcon/mysql/sm.cpp @@ -288,7 +288,7 @@ tpl_open ( tableid_t tableid, SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl; // if first time enter this function for a statement, set - // queryState to QUERY_IN_PRCOESS and get execution plan. + // queryState to QUERY_IN_PROCESS and get execution plan. if (conn_hdl->queryState == NO_QUERY) { conn_hdl->queryState = QUERY_IN_PROCESS;