From 01ff2652a6b3e4a64ca26bba40fcbacad35aa7b9 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Fri, 29 May 2020 22:30:34 -0400 Subject: [PATCH] MCOL-4023 Pushdown WHERE conditions for UPDATE/DELETE. For certain queries, such as: update cs1 set i = 41 where i = 42 or (i is null and 42 is null); the SELECT_LEX.where does not contain the required where conditions. Server sends the where conditions in the call to cond_push(), so we are storing them in a handler data member, condStack, and later push them down to getSelectPlan() for UPDATES/DELETEs. --- dbcon/mysql/ha_mcs.cpp | 27 +++++++++++++--- dbcon/mysql/ha_mcs.h | 7 +++++ dbcon/mysql/ha_mcs_execplan.cpp | 56 ++++++++++++++++++++++++++++++--- dbcon/mysql/ha_mcs_impl.cpp | 22 +++++++------ dbcon/mysql/ha_mcs_impl.h | 6 ++-- dbcon/mysql/ha_mcs_impl_if.h | 2 +- 6 files changed, 98 insertions(+), 22 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 5c2c3560c..1c771063d 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -448,7 +448,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); + rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack); } catch (std::runtime_error& e) { @@ -464,7 +464,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); + rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack); *found_rows = *update_rows; } catch (std::runtime_error& e) @@ -487,7 +487,7 @@ int ha_mcs::direct_delete_rows(ha_rows *deleted_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows); + rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows, condStack); } catch (std::runtime_error& e) { @@ -629,7 +629,7 @@ int ha_mcs::rnd_init(bool scan) { try { - rc = ha_mcs_impl_rnd_init(table); + rc = ha_mcs_impl_rnd_init(table, condStack); } catch (std::runtime_error& e) { @@ -1110,7 +1110,7 @@ const COND* ha_mcs::cond_push(const COND* cond) COND* ret_cond = NULL; try { - ret_cond = ha_mcs_impl_cond_push(const_cast(cond), table); + ret_cond = ha_mcs_impl_cond_push(const_cast(cond), table, condStack); } catch (std::runtime_error& e) { @@ -1119,6 +1119,23 @@ const COND* ha_mcs::cond_push(const COND* cond) DBUG_RETURN(ret_cond); } +void ha_mcs::cond_pop() +{ + DBUG_ENTER("ha_mcs::cond_pop"); + + THD* thd = current_thd; + + if ((((thd->lex)->sql_command == SQLCOM_UPDATE) || + ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || + ((thd->lex)->sql_command == SQLCOM_DELETE) || + ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) && + !condStack.empty()) + { + condStack.pop_back(); + } + + DBUG_VOID_RETURN; +} struct st_mysql_storage_engine columnstore_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 8baf31e7e..a82ea07c5 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -18,6 +18,7 @@ MA 02110-1301, USA. */ #ifndef HA_MCS_H__ #define HA_MCS_H__ + #include #include "idb_mysql.h" #include "ha_mcs_sysvars.h" @@ -44,6 +45,11 @@ class ha_mcs: public handler THR_LOCK_DATA lock; ///< MySQL lock COLUMNSTORE_SHARE* share; ///< Shared lock info ulonglong int_table_flags; + // We are using a vector here to mimick the stack functionality + // using push_back() and pop_back() + // as apparently there is a linker error on the std::stack::pop() + // call on Ubuntu18. + std::vector condStack; public: ha_mcs(handlerton* hton, TABLE_SHARE* table_arg); @@ -222,6 +228,7 @@ public: THR_LOCK_DATA** store_lock(THD* thd, THR_LOCK_DATA** to, enum thr_lock_type lock_type); ///< required const COND* cond_push(const COND* cond); + void cond_pop() override; uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 21ec8eddb..991cb556f 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6333,10 +6333,12 @@ int processFrom(bool &isUnion, int processWhere(SELECT_LEX &select_lex, gp_walk_info &gwi, SCSEP &csep, - List &on_expr_list) + List &on_expr_list, + const std::vector& condStack) { JOIN* join = select_lex.join; Item_cond* icp = 0; + bool isUpdateDelete = false; if (join != 0) icp = reinterpret_cast(join->conds); @@ -6354,7 +6356,7 @@ int processWhere(SELECT_LEX &select_lex, ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) || ((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI ))) { - icp = reinterpret_cast(select_lex.where); + isUpdateDelete = true; } if (icp) @@ -6393,6 +6395,51 @@ int processWhere(SELECT_LEX &select_lex, return ER_INTERNAL_ERROR; } } + else if (isUpdateDelete) + { + // MCOL-4023 For updates/deletes, we iterate over the pushed down condStack + if (!condStack.empty()) + { + std::vector::const_iterator condStackIter = condStack.begin(); + + while (condStackIter != condStack.end()) + { + COND* cond = *condStackIter++; + + cond->traverse_cond(gp_walk, &gwi, Item::POSTFIX); + + if (gwi.fatalParseError) + { + if (gwi.thd->derived_tables_processing) + { + gwi.cs_vtable_is_update_with_derive = true; + return -1; + } + + setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); + return ER_INTERNAL_ERROR; + } + } + } + // if condStack is empty(), check the select_lex for where conditions + // as a last resort + else if ((icp = reinterpret_cast(select_lex.where)) != 0) + { + icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX); + + if (gwi.fatalParseError) + { + if (gwi.thd->derived_tables_processing) + { + gwi.cs_vtable_is_update_with_derive = true; + return -1; + } + + setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); + return ER_INTERNAL_ERROR; + } + } + } else if (join && join->zero_result_cause) { gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM)); @@ -6688,7 +6735,8 @@ int processLimitAndOffset( int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion, - bool isSelectHandlerTop) + bool isSelectHandlerTop, + const std::vector& condStack) { #ifdef DEBUG_WALK_COND cerr << "getSelectPlan()" << endl; @@ -6724,7 +6772,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false; gwi.clauseType = WHERE; - if ((rc = processWhere(select_lex, gwi, csep, on_expr_list))) + if ((rc = processWhere(select_lex, gwi, csep, on_expr_list, condStack))) { return rc; } diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index c920c8657..c6838b025 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -1194,7 +1194,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in return returnVal; } -uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) +uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& condStack) { if (get_fe_conn_info_ptr() == nullptr) set_fe_conn_info_ptr((void*)new cal_connection_info()); @@ -1780,7 +1780,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) gwi.clauseType = WHERE; - if (getSelectPlan(gwi, select_lex, updateCP, false) != 0) //@Bug 3030 Modify the error message for unsupported functions + if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions { if (gwi.cs_vtable_is_update_with_derive) { @@ -2284,7 +2284,7 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name) return 0; } -int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) +int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector& condStack) { THD* thd = current_thd; int rc = 0; @@ -2308,7 +2308,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) if (execute) { - rc = doUpdateDelete(thd, gwi); + rc = doUpdateDelete(thd, gwi, condStack); } cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); @@ -2320,7 +2320,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) return rc; } -int ha_mcs_impl_rnd_init(TABLE* table) +int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStack) { IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl ); THD* thd = current_thd; @@ -2384,7 +2384,7 @@ int ha_mcs_impl_rnd_init(TABLE* table) //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, condStack); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); @@ -3985,7 +3985,7 @@ int ha_mcs_impl_delete_row(const uchar* buf) return 0; } -COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) +COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condStack) { THD* thd = current_thd; @@ -3993,7 +3993,10 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) - return cond; + { + condStack.push_back(cond); + return nullptr; + } string alias; alias.assign(table->alias.ptr(), table->alias.length()); @@ -4959,9 +4962,10 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; + // MCOL-4023 We need to test this code path. //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, std::vector()); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 87ce22ae7..9024130ba 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -29,7 +29,7 @@ extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO extern int ha_mcs_impl_delete_table(const char* name); extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked); extern int ha_mcs_impl_close(void); -extern int ha_mcs_impl_rnd_init(TABLE* table); +extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStack); extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed); @@ -39,10 +39,10 @@ extern int ha_mcs_impl_rename_table(const char* from, const char* to); extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd); -extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table); +extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector&); extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_mcs_impl_update_row(); -extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows); +extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector& condStack); extern int ha_mcs_impl_delete_row(); extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 9975605b7..3d2ae4cfe 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -342,7 +342,7 @@ int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_in int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); -int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false); +int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false, const std::vector& condStack = std::vector()); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg);