From 6579180810f613a58370f0874990829ed0667752 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 31 Oct 2023 12:54:18 +0000 Subject: [PATCH] fix(plugin): MCOL-4740: This fixes update rows counter for multi-table update For UPDATEs involving a single table, the server call to handler::direct_update_rows() is used to correctly set the count for the number of updated rows in the UPDATE statement. However, for UPDATEs involving multi-tables, the server does not call handler::direct_update_rows(). This patch adds support to correctly report the number of updated rows to the client by setting multi_update::updated and multi_update::found in handler::rnd_end(). --- dbcon/mysql/ha_mcs.cpp | 37 ++++++++ dbcon/mysql/ha_mcs_common.h | 148 ++++++++++++++++++++++++++++++ dbcon/mysql/ha_mcs_execplan.cpp | 120 +++---------------------- dbcon/mysql/ha_mcs_impl.cpp | 155 +++++++++++++++++--------------- dbcon/mysql/ha_mcs_impl_if.h | 26 ------ dbcon/mysql/ha_mcs_pushdown.cpp | 37 ++++---- dbcon/mysql/ha_mcs_pushdown.h | 1 + dbcon/mysql/ha_view.cpp | 3 +- 8 files changed, 302 insertions(+), 225 deletions(-) create mode 100644 dbcon/mysql/ha_mcs_common.h diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index c735dbc45..d0804470f 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -16,6 +16,14 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +// This makes specific MDB classes' attributes public to implement +// MCOL-4740 temporary solution. Search for MCOL-4740 +// to get the actual place where it is used. +#define updated_leaves \ + updated_leaves; \ + \ + public: + #include "ha_mcs.h" #include "maria_def.h" #include @@ -614,10 +622,39 @@ int ha_mcs::rnd_init(bool scan) DBUG_RETURN(rc); } +void update_counters_on_multi_update() +{ + if (ha_mcs_common::isMultiUpdateStatement(current_thd->lex->sql_command) && + !ha_mcs_common::isForeignTableUpdate(current_thd)) + { + SELECT_LEX_UNIT* unit = ¤t_thd->lex->unit; + SELECT_LEX* select_lex = unit->first_select(); + auto* multi = (select_lex->join) ? reinterpret_cast(select_lex->join->result) : nullptr; + + if (multi) + { + multi->table_to_update = multi->update_tables ? multi->update_tables->table : 0; + + cal_impl_if::cal_connection_info* ci = + reinterpret_cast(get_fe_conn_info_ptr()); + + if (ci) + { + multi->updated = multi->found = ci->affectedRows; + } + } + } +} + int ha_mcs::rnd_end() { DBUG_ENTER("ha_mcs::rnd_end"); + // MCOL-4740 multi_update::send_eof(), which outputs the affected + // number of rows to the client, is called after handler::rnd_end(). + // So we set multi_update::updated and multi_update::found here. + update_counters_on_multi_update(); + int rc; try { diff --git a/dbcon/mysql/ha_mcs_common.h b/dbcon/mysql/ha_mcs_common.h new file mode 100644 index 000000000..812c2514d --- /dev/null +++ b/dbcon/mysql/ha_mcs_common.h @@ -0,0 +1,148 @@ +/* Copyright (C) 2016-2023 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "idb_mysql.h" + +namespace ha_mcs_common +{ + +inline bool isMCSTable(TABLE* table_ptr) +{ +#if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX) + + if (!(table_ptr->s && (*table_ptr->s->db_plugin)->name.str)) +#else + if (!(table_ptr->s && (table_ptr->s->db_plugin)->name.str)) +#endif + return true; + +#if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX) + std::string engineName = (*table_ptr->s->db_plugin)->name.str; +#else + std::string engineName = table_ptr->s->db_plugin->name.str; +#endif + + if (engineName == "Columnstore" || engineName == "Columnstore_cache") + return true; + return false; +} + +inline bool isMultiUpdateStatement(const enum_sql_command& command) +{ + return command == SQLCOM_UPDATE_MULTI; +} + +inline bool isUpdateStatement(const enum_sql_command& command) +{ + return command == SQLCOM_UPDATE || isMultiUpdateStatement(command); +} + +inline bool isDeleteStatement(const enum_sql_command& command) +{ + return (command == SQLCOM_DELETE) || (command == SQLCOM_DELETE_MULTI); +} + +inline bool isUpdateOrDeleteStatement(const enum_sql_command& command) +{ + return isUpdateStatement(command) || isDeleteStatement(command); +} + +inline bool isDMLStatement(const enum_sql_command& command) +{ + return (command == SQLCOM_INSERT || command == SQLCOM_INSERT_SELECT || command == SQLCOM_TRUNCATE || + command == SQLCOM_LOAD || isUpdateOrDeleteStatement(command)); +} + +inline bool isForeignTableUpdate(THD* thd) +{ + LEX* lex = thd->lex; + + if (!isUpdateStatement(lex->sql_command)) + return false; + + Item_field* item; + List_iterator_fast field_it(lex->first_select_lex()->item_list); + + while ((item = (Item_field*)field_it++)) + { + if (item->field && item->field->table && !isMCSTable(item->field->table)) + return true; + } + + return false; +} + +// This function is different from isForeignTableUpdate() +// above as it only checks if any of the tables involved +// in the multi-table update statement is a foreign table, +// irrespective of whether the update is performed on the +// foreign table or not, as in isForeignTableUpdate(). +inline bool isUpdateHasForeignTable(THD* thd) +{ + LEX* lex = thd->lex; + + if (!isUpdateStatement(lex->sql_command)) + return false; + + TABLE_LIST* table_ptr = lex->first_select_lex()->get_table_list(); + + for (; table_ptr; table_ptr = table_ptr->next_local) + { + if (table_ptr->table && !isMCSTable(table_ptr->table)) + return true; + } + + return false; +} + +inline bool isMCSTableUpdate(THD* thd) +{ + LEX* lex = thd->lex; + + if (!isUpdateStatement(lex->sql_command)) + return false; + + Item_field* item; + List_iterator_fast field_it(lex->first_select_lex()->item_list); + + while ((item = (Item_field*)field_it++)) + { + if (item->field && item->field->table && isMCSTable(item->field->table)) + return true; + } + + return false; +} + +inline bool isMCSTableDelete(THD* thd) +{ + LEX* lex = thd->lex; + + if (!isDeleteStatement(lex->sql_command)) + return false; + + TABLE_LIST* table_ptr = lex->first_select_lex()->get_table_list(); + + if (table_ptr && table_ptr->table && isMCSTable(table_ptr->table)) + return true; + + return false; +} + +} // namespace ha_mcs_common diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index d2d5bebe8..a169e02ef 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -4693,10 +4693,10 @@ SimpleColumn* buildSimpleColumn(Item_field* ifp, gp_walk_info& gwi) { // check foreign engine if (ifp->cached_table && ifp->cached_table->table) - prm.columnStore(isMCSTable(ifp->cached_table->table)); + prm.columnStore(ha_mcs_common::isMCSTable(ifp->cached_table->table)); // @bug4509. ifp->cached_table could be null for myisam sometimes else if (ifp->field && ifp->field->table) - prm.columnStore(isMCSTable(ifp->field->table)); + prm.columnStore(ha_mcs_common::isMCSTable(ifp->field->table)); if (prm.columnStore()) { @@ -6529,104 +6529,6 @@ void parse_item(Item* item, vector& field_vec, bool& hasNonSupportI } } -bool isMCSTable(TABLE* table_ptr) -{ -#if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX) - - if (!(table_ptr->s && (*table_ptr->s->db_plugin)->name.str)) -#else - if (!(table_ptr->s && (table_ptr->s->db_plugin)->name.str)) -#endif - return true; - -#if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX) - string engineName = (*table_ptr->s->db_plugin)->name.str; -#else - string engineName = table_ptr->s->db_plugin->name.str; -#endif - - if (engineName == "Columnstore" || engineName == "Columnstore_cache") - return true; - else - return false; -} - -bool isForeignTableUpdate(THD* thd) -{ - LEX* lex = thd->lex; - - if (!isUpdateStatement(lex->sql_command)) - return false; - - Item_field* item; - List_iterator_fast field_it(lex->first_select_lex()->item_list); - - while ((item = (Item_field*)field_it++)) - { - if (item->field && item->field->table && !isMCSTable(item->field->table)) - return true; - } - - return false; -} - -bool isMCSTableUpdate(THD* thd) -{ - LEX* lex = thd->lex; - - if (!isUpdateStatement(lex->sql_command)) - return false; - - Item_field* item; - List_iterator_fast field_it(lex->first_select_lex()->item_list); - - while ((item = (Item_field*)field_it++)) - { - if (item->field && item->field->table && isMCSTable(item->field->table)) - return true; - } - - return false; -} - -bool isMCSTableDelete(THD* thd) -{ - LEX* lex = thd->lex; - - if (!isDeleteStatement(lex->sql_command)) - return false; - - TABLE_LIST* table_ptr = lex->first_select_lex()->get_table_list(); - - if (table_ptr && table_ptr->table && isMCSTable(table_ptr->table)) - return true; - - return false; -} - -// This function is different from isForeignTableUpdate() -// above as it only checks if any of the tables involved -// in the multi-table update statement is a foreign table, -// irrespective of whether the update is performed on the -// foreign table or not, as in isForeignTableUpdate(). -bool isUpdateHasForeignTable(THD* thd) -{ - LEX* lex = thd->lex; - - if (!isUpdateStatement(lex->sql_command)) - return false; - - TABLE_LIST* table_ptr = lex->first_select_lex()->get_table_list(); - - for (; table_ptr; table_ptr = table_ptr->next_local) - { - if (table_ptr->table && !isMCSTable(table_ptr->table)) - return true; - } - - return false; -} - /*@brief set some runtime params to run the query */ /*********************************************************** * DESCRIPTION: @@ -6750,7 +6652,7 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& else { // check foreign engine tables - bool columnStore = (table_ptr->table ? isMCSTable(table_ptr->table) : true); + bool columnStore = (table_ptr->table ? ha_mcs_common::isMCSTable(table_ptr->table) : true); // trigger system catalog cache if (columnStore) @@ -6881,7 +6783,7 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s else if (select_lex.where) icp = select_lex.where; } - else if (!join && isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + else if (!join && ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { isUpdateDelete = true; } @@ -7657,7 +7559,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i } //@Bug 3030 Add error check for dml statement - if (isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { if (after_size - before_size != 0) { @@ -7691,7 +7593,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i case REAL_RESULT: case TIME_RESULT: { - if (isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { } else @@ -7724,7 +7626,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i case Item::NULL_ITEM: { - if (isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { } else @@ -9052,7 +8954,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro else { // check foreign engine tables - bool columnStore = (table_ptr->table ? isMCSTable(table_ptr->table) : true); + bool columnStore = (table_ptr->table ? ha_mcs_common::isMCSTable(table_ptr->table) : true); // trigger system catalog cache if (columnStore) @@ -9519,7 +9421,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro } //@Bug 3030 Add error check for dml statement - if (isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { if (after_size - before_size != 0) { @@ -9550,7 +9452,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro case REAL_RESULT: case TIME_RESULT: { - if (isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { } else @@ -9583,7 +9485,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro case Item::NULL_ITEM: { - if (isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(gwi.thd->lex->sql_command)) { } else diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 9eeac01c6..4c57e0590 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -302,18 +302,18 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, long t { // @bug 2244. Always log this msg for now, as we try to track down when/why we are // losing socket connection with ExeMgr - //#ifdef INFINIDB_DEBUG + // #ifdef INFINIDB_DEBUG tpl_scan_fetch_LogException(ti, ci, &ex); - //#endif + // #endif sm_stat = sm::CALPONT_INTERNAL_ERROR; } catch (...) { // @bug 2244. Always log this msg for now, as we try to track down when/why we are // losing socket connection with ExeMgr - //#ifdef INFINIDB_DEBUG + // #ifdef INFINIDB_DEBUG tpl_scan_fetch_LogException(ti, ci, 0); - //#endif + // #endif sm_stat = sm::CALPONT_INTERNAL_ERROR; } @@ -758,7 +758,8 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& condStack) { - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -827,7 +828,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c { Message::Args args; - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) args.add("Update"); #if 0 else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0) @@ -884,7 +885,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c updateCP->isDML(true); //@Bug 2753. the memory already freed by destructor of UpdateSqlStatement - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { ColumnAssignment* columnAssignmentPtr; Item_field* item; @@ -896,7 +897,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c while ((item = (Item_field*)field_it++)) { - string tmpTableName = bestTableName(item); //@Bug 5312 populate aliasname with tablename if it is empty @@ -1174,7 +1174,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c } // Exit early if there is nothing to update - if (colAssignmentListPtr->empty() && isUpdateStatement(thd->lex->sql_command)) + if (colAssignmentListPtr->empty() && ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { ci->affectedRows = 0; return 0; @@ -1187,7 +1187,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c CalpontSystemCatalog::TableName aTableName; TABLE_LIST* first_table = 0; - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { aTableName.schema = schemaName; aTableName.table = tableName; @@ -1209,7 +1209,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c IDEBUG(cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id << endl); VendorDMLStatement dmlStatement(dmlStmt, sessionID); - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) dmlStatement.set_DMLStatementType(DML_UPDATE); else dmlStatement.set_DMLStatementType(DML_DELETE); @@ -1220,7 +1220,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c //@Bug 2753. To make sure the momory is freed. updateStmt.fColAssignmentListPtr = colAssignmentListPtr; - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { qualifiedTablName->fName = tableName; qualifiedTablName->fSchema = schemaName; @@ -1314,7 +1314,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c List items; SELECT_LEX select_lex; - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { items = (thd->lex->first_select_lex()->item_list); thd->lex->first_select_lex()->item_list = thd->lex->value_list; @@ -1463,7 +1463,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c returnedCols.push_back((updateCP->columnMap()).begin()->second); //@Bug 6123. get the correct returned columnlist - if (isDeleteStatement(thd->lex->sql_command)) + if (ha_mcs_common::isDeleteStatement(thd->lex->sql_command)) { returnedCols.clear(); // choose the smallest column to project @@ -1514,7 +1514,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c updateCP->returnedCols(returnedCols); - if (isUpdateStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { const ParseTree* ptsub = updateCP->filters(); @@ -1973,7 +1973,7 @@ int ha_mcs_impl_analyze(THD* thd, TABLE* table) if (table->s->db.length && strcmp(table->s->db.str, "information_schema") == 0) return 0; - bool columnStore = (table ? isMCSTable(table) : true); + bool columnStore = (table ? ha_mcs_common::isMCSTable(table) : true); // Skip non columnstore tables. if (!columnStore) return 0; @@ -2041,7 +2041,8 @@ int ha_mcs_impl_analyze(THD* thd, TABLE* table) query.assign(idb_mysql_query_str(thd)); caep->data(query); - if (!get_fe_conn_info_ptr()) { + if (!get_fe_conn_info_ptr()) + { set_fe_conn_info_ptr(reinterpret_cast(new cal_connection_info(), thd)); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2147,7 +2148,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows, gwi.thd = thd; int rc = 0; - if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command)) + if (thd->slave_thread && !get_replication_slave(thd) && + ha_mcs_common::isDMLStatement(thd->lex->sql_command)) { if (affected_rows) *affected_rows = 0; @@ -2179,11 +2181,10 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) gwi.thd = thd; if (thd->slave_thread && !get_replication_slave(thd) && - (isDMLStatement(thd->lex->sql_command) || - thd->lex->sql_command == SQLCOM_ALTER_TABLE)) + (ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE)) return 0; - // check whether the system is ready to process statement. + // check whether the system is ready to process statement. static DBRM dbrm(true); int bSystemQueryReady = dbrm.getSystemQueryReady(); @@ -2232,14 +2233,15 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) UPDATE innotab1 SET a=100 WHERE a NOT IN (SELECT a FROM cstab1 WHERE a=1); */ if (!isReadOnly() && // make sure the current table is being modified - isUpdateOrDeleteStatement(thd->lex->sql_command)) + ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command)) return doUpdateDelete(thd, gwi, condStack); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); csc->identity(CalpontSystemCatalog::FE); - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2555,11 +2557,10 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table, long timeZone) THD* thd = current_thd; if (thd->slave_thread && !get_replication_slave(thd) && - (isDMLStatement(thd->lex->sql_command) || - thd->lex->sql_command == SQLCOM_ALTER_TABLE)) + (ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE)) return HA_ERR_END_OF_FILE; - if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) + if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd)) return HA_ERR_END_OF_FILE; // @bug 2547 @@ -2567,7 +2568,8 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table, long timeZone) // if (MIGR::infinidb_vtable.impossibleWhereOnUnion) // return HA_ERR_END_OF_FILE; - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2638,8 +2640,7 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) THD* thd = current_thd; if (thd->slave_thread && !get_replication_slave(thd) && - (isDMLStatement(thd->lex->sql_command) || - thd->lex->sql_command == SQLCOM_ALTER_TABLE)) + (ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE)) return 0; cal_connection_info* ci = nullptr; @@ -2650,7 +2651,7 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) if ((thd->lex)->sql_command == SQLCOM_ALTER_TABLE) return rc; - if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) + if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd)) return rc; if (!ci) @@ -2663,18 +2664,18 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) if (thd->lex->analyze_stmt && ci->cal_conn_hndl && ci->cal_conn_hndl->exeMgr) { // The ANALYZE statement leaves ExeMgr hanging. This clears it up. - ci->cal_conn_hndl->exeMgr->read(); // Ignore the returned buffer + ci->cal_conn_hndl->exeMgr->read(); // Ignore the returned buffer ByteStream msg; - ByteStream::quadbyte qb = 1; // Tell PrimProc front session to eat all the rows + ByteStream::quadbyte qb = 1; // Tell PrimProc front session to eat all the rows msg << qb; ci->cal_conn_hndl->exeMgr->write(msg); // This is the command to start sending return values. because we previously sent the swallow // rows command, there won't be anything useful coming back, but it needs this to flush internal queues. - qb = 5; // Read the result data. + qb = 5; // Read the result data. msg.reset(); msg << qb; ci->cal_conn_hndl->exeMgr->write(msg); - qb = 0; // End the query + qb = 0; // End the query msg.reset(); msg << qb; ci->cal_conn_hndl->exeMgr->write(msg); @@ -2775,7 +2776,8 @@ int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* creat { THD* thd = current_thd; - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2819,7 +2821,8 @@ int ha_mcs_impl_delete_table(const char* name) if (!memcmp((uchar*)name, tmp_file_prefix, tmp_file_prefix_length)) return 0; - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2884,7 +2887,8 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed, return ER_CHECK_NOT_IMPLEMENTED; } - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2937,7 +2941,8 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed, int ha_mcs_impl_update_row() { - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2953,7 +2958,8 @@ int ha_mcs_impl_update_row() int ha_mcs_impl_delete_row() { - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2974,7 +2980,8 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if (thd->slave_thread && !get_replication_slave(thd)) return; - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3101,7 +3108,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->useXbit = false; //@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not - //show up in header. + // show up in header. unsigned int numberNotNull = 0; for (unsigned int j = 0; j < colrids.size(); j++) @@ -3279,7 +3286,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins // Set read_set used for bulk insertion of Fields inheriting // from Field_blob|Field_varstring. Used in ColWriteBatchString() bitmap_set_all(table->read_set); - } else { @@ -3395,7 +3401,8 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) std::string aTmpDir(startup::StartUp::tmpDir()); - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3421,7 +3428,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) ((thd->lex)->sql_command == SQLCOM_LOAD) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert)) { - if ((thd->killed > 0) && (ci->cpimport_pid > 0)) // handle CTRL-C { // cout << "sending ctrl-c to cpimport" << endl; @@ -3500,12 +3506,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) else { ostringstream oss; - oss << "End SQL statement with error, rc=" << rc - << ", aPid=" << aPid - << ", WIF=" << WIFEXITED(aStatus) - << ", WEXIT=" << WEXITSTATUS(aStatus); - ha_mcs_impl::log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, - tid2sid(thd->thread_id)); + oss << "End SQL statement with error, rc=" << rc << ", aPid=" << aPid + << ", WIF=" << WIFEXITED(aStatus) << ", WEXIT=" << WEXITSTATUS(aStatus); + ha_mcs_impl::log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); } ci->columnTypes.clear(); @@ -3591,7 +3594,8 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) int ha_mcs_impl_commit(handlerton* hton, THD* thd, bool all) { - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3626,7 +3630,8 @@ int ha_mcs_impl_commit(handlerton* hton, THD* thd, bool all) int ha_mcs_impl_rollback(handlerton* hton, THD* thd, bool all) { - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3698,7 +3703,8 @@ int ha_mcs_impl_rename_table(const char* from, const char* to) { IDEBUG(cout << "ha_mcs_impl_rename_table: " << from << " => " << to << endl); - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3734,7 +3740,7 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt { THD* thd = current_thd; - if (isUpdateOrDeleteStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command)) { condStack.push_back(cond); return nullptr; @@ -3744,7 +3750,8 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt alias.assign(table->alias.ptr(), table->alias.length()); IDEBUG(cout << "ha_mcs_impl_cond_push: " << alias << endl); - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3815,7 +3822,7 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt inline void disableBinlogForDML(THD* thd) { - if (isDMLStatement(thd->lex->sql_command) && (thd->variables.option_bits & OPTION_BIN_LOG)) + if (ha_mcs_common::isDMLStatement(thd->lex->sql_command) && (thd->variables.option_bits & OPTION_BIN_LOG)) { set_original_option_bits(thd->variables.option_bits, thd); thd->variables.option_bits &= ~OPTION_BIN_LOG; @@ -3825,7 +3832,7 @@ inline void disableBinlogForDML(THD* thd) inline void restoreBinlogForDML(THD* thd) { - if (isDMLStatement(thd->lex->sql_command)) + if (ha_mcs_common::isDMLStatement(thd->lex->sql_command)) { ulonglong orig_option_bits = get_original_option_bits(thd); @@ -3853,7 +3860,8 @@ int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type) alias.assign(table->alias.ptr(), table->alias.length()); IDEBUG(cout << "external_lock for " << alias << endl); - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -4011,12 +4019,12 @@ int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table) return ER_INTERNAL_ERROR; } - uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); csc->identity(CalpontSystemCatalog::FE); - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -4429,13 +4437,15 @@ int ha_mcs_impl_group_by_next(TABLE* table, long timeZone) { THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command)) + if (thd->slave_thread && !get_replication_slave(thd) && + ha_mcs_common::isDMLStatement(thd->lex->sql_command)) return HA_ERR_END_OF_FILE; - if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) + if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd)) return HA_ERR_END_OF_FILE; - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -4506,7 +4516,8 @@ int ha_mcs_impl_group_by_end(TABLE* table) int rc = 0; THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command)) + if (thd->slave_thread && !get_replication_slave(thd) && + ha_mcs_common::isDMLStatement(thd->lex->sql_command)) return 0; cal_connection_info* ci = nullptr; @@ -4672,7 +4683,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool IDEBUG(cout << "pushdown_init for table " << endl); THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command)) + if (thd->slave_thread && !get_replication_slave(thd) && + ha_mcs_common::isDMLStatement(thd->lex->sql_command)) return 0; const char* timeZone = thd->variables.time_zone->get_name()->ptr(); @@ -4713,14 +4725,15 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool // MCOL-4023 We need to test this code path. // Update and delete code - if (isUpdateOrDeleteStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command)) return doUpdateDelete(thd, gwi, std::vector()); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); csc->identity(CalpontSystemCatalog::FE); - if (!get_fe_conn_info_ptr()) { + if (!get_fe_conn_info_ptr()) + { set_fe_conn_info_ptr(reinterpret_cast(new cal_connection_info(), thd)); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -5118,19 +5131,21 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone) { THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command)) + if (thd->slave_thread && !get_replication_slave(thd) && + ha_mcs_common::isDMLStatement(thd->lex->sql_command)) return HA_ERR_END_OF_FILE; int rc = HA_ERR_END_OF_FILE; - if (get_fe_conn_info_ptr() == nullptr) { + if (get_fe_conn_info_ptr() == nullptr) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); } cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (isUpdateOrDeleteStatement(thd->lex->sql_command)) + if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command)) return rc; // @bug 2547 diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index b3a9917c1..0bcfebc36 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -399,11 +399,6 @@ void gp_walk(const Item* item, void* arg); void parse_item(Item* item, std::vector& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL); const std::string bestTableName(const Item_field* ifp); -bool isMCSTable(TABLE* table_ptr); -bool isForeignTableUpdate(THD* thd); -bool isUpdateHasForeignTable(THD* thd); -bool isMCSTableUpdate(THD* thd); -bool isMCSTableDelete(THD* thd); // execution plan util functions prototypes execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport, @@ -450,27 +445,6 @@ bool buildEqualityPredicate(execplan::ReturnedColumn* lhs, execplan::ReturnedCol boost::shared_ptr& sop, const Item_func::Functype& funcType, const std::vector& itemList, bool isInSubs = false); -inline bool isUpdateStatement(const enum_sql_command& command) -{ - return ((command == SQLCOM_UPDATE) || (command == SQLCOM_UPDATE_MULTI)); -} - -inline bool isDeleteStatement(const enum_sql_command& command) -{ - return (command == SQLCOM_DELETE) || (command == SQLCOM_DELETE_MULTI); -} - -inline bool isUpdateOrDeleteStatement(const enum_sql_command& command) -{ - return isUpdateStatement(command) || isDeleteStatement(command); -} - -inline bool isDMLStatement(const enum_sql_command& command) -{ - return (command == SQLCOM_INSERT || command == SQLCOM_INSERT_SELECT || command == SQLCOM_TRUNCATE || - command == SQLCOM_LOAD || isUpdateOrDeleteStatement(command)); -} - #ifdef DEBUG_WALK_COND void debug_walk(const Item* item, void* arg); #endif diff --git a/dbcon/mysql/ha_mcs_pushdown.cpp b/dbcon/mysql/ha_mcs_pushdown.cpp index b90e866df..f38681d10 100644 --- a/dbcon/mysql/ha_mcs_pushdown.cpp +++ b/dbcon/mysql/ha_mcs_pushdown.cpp @@ -482,7 +482,7 @@ derived_handler* create_columnstore_derived_handler(THD* thd, TABLE_LIST* table_ // MCOL-1482 Disable derived_handler if the multi-table update // statement contains a non-columnstore table. - if (cal_impl_if::isUpdateHasForeignTable(thd)) + if (ha_mcs_common::isUpdateHasForeignTable(thd)) { return handler; } @@ -715,18 +715,18 @@ int ha_mcs_group_by_handler::end_scan() /*@brief create_columnstore_select_handler_- Creates handler ************************************************************ - * DESCRIPTION: - * Creates a select handler if there is no non-equi JOIN, e.g - * t1.c1 > t2.c2 and logical OR in the filter predicates. - * More details in server/sql/select_handler.h - * PARAMETERS: - * thd - THD pointer. - * sel_lex - SELECT_LEX* that describes the query. - * sel_unit - SELECT_LEX_UNIT* that describes the query. - * RETURN: - * select_handler if possible - * NULL in other case - ***********************************************************/ +* DESCRIPTION: +* Creates a select handler if there is no non-equi JOIN, e.g +* t1.c1 > t2.c2 and logical OR in the filter predicates. +* More details in server/sql/select_handler.h +* PARAMETERS: +* thd - THD pointer. +* sel_lex - SELECT_LEX* that describes the query. +* sel_unit - SELECT_LEX_UNIT* that describes the query. +* RETURN: +* select_handler if possible +* NULL in other case +***********************************************************/ select_handler* create_columnstore_select_handler_(THD* thd, SELECT_LEX* sel_lex, SELECT_LEX_UNIT* sel_unit) { mcs_select_handler_mode_t select_handler_mode = get_select_handler_mode(thd); @@ -742,7 +742,7 @@ select_handler* create_columnstore_select_handler_(THD* thd, SELECT_LEX* sel_lex // MCOL-1482 Disable select_handler for a multi-table update // with a non-columnstore table as the target table of the update // operation. - if (cal_impl_if::isForeignTableUpdate(thd)) + if (ha_mcs_common::isForeignTableUpdate(thd)) { return nullptr; } @@ -810,15 +810,15 @@ select_handler* create_columnstore_select_handler_(THD* thd, SELECT_LEX* sel_lex // or unsupported feature. ha_columnstore_select_handler* handler; - if (sel_unit && sel_lex) // partial pushdown of the SELECT_LEX_UNIT + if (sel_unit && sel_lex) // partial pushdown of the SELECT_LEX_UNIT { handler = new ha_columnstore_select_handler(thd, sel_lex, sel_unit); } - else if (sel_unit) // complete pushdown of the SELECT_LEX_UNIT + else if (sel_unit) // complete pushdown of the SELECT_LEX_UNIT { handler = new ha_columnstore_select_handler(thd, sel_unit); } - else // Query only has a SELECT_LEX, no SELECT_LEX_UNIT + else // Query only has a SELECT_LEX, no SELECT_LEX_UNIT { handler = new ha_columnstore_select_handler(thd, sel_lex); } @@ -908,8 +908,7 @@ select_handler* create_columnstore_select_handler_(THD* thd, SELECT_LEX* sel_lex select_lex != select_lex->master_unit()->fake_select_lex) // (2) thd->lex->set_limit_rows_examined(); - if ((!sel_unit || sel_lex) && !join->tables_list && - (join->table_count || !select_lex->with_sum_func) && + if ((!sel_unit || sel_lex) && !join->tables_list && (join->table_count || !select_lex->with_sum_func) && !select_lex->have_window_funcs()) { if (!thd->is_error()) diff --git a/dbcon/mysql/ha_mcs_pushdown.h b/dbcon/mysql/ha_mcs_pushdown.h index e7b334a21..87ef0b826 100644 --- a/dbcon/mysql/ha_mcs_pushdown.h +++ b/dbcon/mysql/ha_mcs_pushdown.h @@ -20,6 +20,7 @@ #include "idb_mysql.h" #include "ha_mcs.h" #include "ha_mcs_sysvars.h" +#include "ha_mcs_common.h" #define NEED_CALPONT_EXTERNS #include "ha_mcs_impl.h" #include "ha_mcs_impl_if.h" diff --git a/dbcon/mysql/ha_view.cpp b/dbcon/mysql/ha_view.cpp index 4912fd71d..b419ee9ee 100644 --- a/dbcon/mysql/ha_view.cpp +++ b/dbcon/mysql/ha_view.cpp @@ -42,6 +42,7 @@ using namespace execplan; #include "ha_subquery.h" #include "ha_view.h" +#include "ha_mcs_common.h" namespace cal_impl_if { @@ -131,7 +132,7 @@ void View::transform() else { // check foreign engine tables - bool columnStore = (table_ptr->table ? isMCSTable(table_ptr->table) : true); + bool columnStore = (table_ptr->table ? ha_mcs_common::isMCSTable(table_ptr->table) : true); // trigger system catalog cache if (columnStore)