From 5cedeb110b3e820a92d43e1f0a7249874854112a Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Wed, 27 Nov 2019 09:33:21 -0600 Subject: [PATCH] MCOL-3529 This patch implements direct_update and direct_delete features to properly report the number of rows affected by UPDATE|DELETE. --- dbcon/mysql/ha_mcs.cpp | 82 ++++++++++++++++++------------------ dbcon/mysql/ha_mcs.h | 10 ++--- dbcon/mysql/ha_mcs_impl.cpp | 63 +++++++++++---------------- dbcon/mysql/ha_mcs_impl.h | 1 + dbcon/mysql/ha_mcs_impl_if.h | 3 +- 5 files changed, 73 insertions(+), 86 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index d501140f5..de4cc11b0 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -197,10 +197,9 @@ ha_mcs::ha_mcs(handlerton* hton, TABLE_SHARE* table_arg) : handler(hton, table_arg), int_table_flags(HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE | HA_TABLE_SCAN_ON_INDEX | - HA_CAN_TABLE_CONDITION_PUSHDOWN) -// int_table_flags(HA_NO_BLOBS | HA_BINLOG_STMT_CAPABLE) -{ -} + HA_CAN_TABLE_CONDITION_PUSHDOWN | + HA_CAN_DIRECT_UPDATE_AND_DELETE) +{ } /** @@ -292,25 +291,7 @@ int ha_mcs::close(void) @details Example of this would be: @code - for (Field **field=table->field ; *field ; field++) - { - ... - } @endcode - - See ha_tina.cc for an example of extracting all of the data as strings. - ha_berekly.cc has an example of how to store it intact by "packing" it - for ha_berkeley's own native storage type. - - See the note for update_row() on auto_increments and timestamps. This - case also applies to write_row(). - - Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc, - sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc. - - @see - item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc, - sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc and sql_update.cc */ int ha_mcs::write_row(const uchar* buf) @@ -328,14 +309,6 @@ void ha_mcs::start_bulk_insert(ha_rows rows, uint flags) DBUG_VOID_RETURN; } -int ha_mcs::end_bulk_insert(bool abort) -{ - DBUG_ENTER("ha_mcs::end_bulk_insert"); - int rc = ha_mcs_impl_end_bulk_insert(abort, table); - DBUG_RETURN(rc); -} - -/**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */ int ha_mcs::end_bulk_insert() { DBUG_ENTER("ha_mcs::end_bulk_insert"); @@ -351,19 +324,10 @@ int ha_mcs::end_bulk_insert() clause was used. Consecutive ordering is not guaranteed. @details - Currently new_data will not have an updated auto_increament record, or - and updated timestamp field. You can do these for example by doing: - @code - if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) - table->timestamp_field->set_time(); - if (table->next_number_field && record == table->record[0]) - update_auto_increment(); + @code @endcode - Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc. - @see - sql_select.cc, sql_acl.cc, sql_update.cc and sql_insert.cc */ int ha_mcs::update_row(const uchar* old_data, uchar* new_data) { @@ -373,7 +337,45 @@ int ha_mcs::update_row(const uchar* old_data, uchar* new_data) DBUG_RETURN(rc); } +// WIP +/** + @brief + Yes, update_row() does what you expect, it updates a row. old_data will have + the previous row record in it, while new_data will have the newest data in it. + Keep in mind that the server can do updates based on ordering if an ORDER BY + clause was used. Consecutive ordering is not guaranteed. + @details + @code + @endcode + + @see +*/ +int ha_mcs::direct_update_rows_init(List *update_fields) +{ + DBUG_ENTER("ha_mcs::direct_update_rows_init"); + DBUG_RETURN(0); +} + +int ha_mcs::direct_update_rows(ha_rows *update_rows) +{ + DBUG_ENTER("ha_mcs::direct_update_rows"); + int rc = ha_mcs_impl_direct_update_delete_rows(update_rows); + DBUG_RETURN(rc); +} + +int ha_mcs::direct_delete_rows_init() +{ + DBUG_ENTER("ha_mcs::direct_delete_rows_init"); + DBUG_RETURN(0); +} + +int ha_mcs::direct_delete_rows(ha_rows *deleted_rows) +{ + DBUG_ENTER("ha_mcs::direct_delete_rows"); + int rc = ha_mcs_impl_direct_update_delete_rows(deleted_rows); + DBUG_RETURN(rc); +} /** @brief This will delete a row. buf will contain a copy of the row to be deleted. diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index c2a9d6532..a62a746a7 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -143,12 +143,6 @@ public: */ void start_bulk_insert(ha_rows rows, uint flags = 0) ; - /** @brief - We implement this in ha_example.cc. It's not an obligatory method; - skip it and and MySQL will treat it as not implemented. - */ - int end_bulk_insert(bool abort) ; - /**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */ int end_bulk_insert() ; @@ -157,12 +151,16 @@ public: skip it and and MySQL will treat it as not implemented. */ int update_row(const uchar* old_data, uchar* new_data); + int direct_update_rows_init(List *update_fields); + int direct_update_rows(ha_rows *update_rows); /** @brief We implement this in ha_example.cc. It's not an obligatory method; skip it and and MySQL will treat it as not implemented. */ int delete_row(const uchar* buf); + int direct_delete_rows_init(); + int direct_delete_rows(ha_rows *deleted_rows); /** @brief We implement this in ha_example.cc. It's not an obligatory method; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 57dbe8bdb..cac9eaa3d 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -16,11 +16,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -/* - * $Id: ha_mcs_impl.cpp 9642 2013-06-24 14:57:42Z rdempsey $ - */ - -//#define DEBUG_WALK_COND #include #ifndef _MSC_VER #include @@ -41,7 +36,6 @@ #include #include #include -//#define NDEBUG #include #include #include @@ -61,7 +55,6 @@ using namespace std; #include #include #include -//using namespace boost; #include "idb_mysql.h" @@ -1184,7 +1177,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in return returnVal; } -uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) +uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, ha_rows *affected_rows) { if (get_fe_conn_info_ptr() == NULL) set_fe_conn_info_ptr((void*)new cal_connection_info()); @@ -1485,7 +1478,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) } else if ( value->type() == Item::NULL_ITEM ) { -// dmlStmt += "NULL"; columnAssignmentPtr->fScalarExpression = ""; columnAssignmentPtr->fFromCol = false; columnAssignmentPtr->fIsNull = true; @@ -1494,13 +1486,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) { isFromCol = true; columnAssignmentPtr->fFromCol = true; -// Item_field* setIt = reinterpret_cast (value); -// string sectableName = string(setIt->table_name); -// string secschemaName = string(setIt->db_name); -// if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) || (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0)) -// { - isFromSameTable = false; -// } + isFromSameTable = false; } //@Bug 4449 handle default value else if (value->type() == Item::DEFAULT_VALUE_ITEM) @@ -1554,6 +1540,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) { if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end()) { + // DRRTUY * That is far from ideal. columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", ""); struct timeval tv; char buf[64]; @@ -1729,9 +1716,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) return 0; } - //if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ) { - //updateCP->subType (CalpontSelectExecutionPlan::SINGLEROW_SUBS); //set scalar updateCP->subType (CalpontSelectExecutionPlan::SELECT_SUBS); //@Bug 2975. SessionManager sm; @@ -1834,6 +1819,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) } updateCP->tableList( tbList ); + // DRRTUY * this is very optimisic assumption updateCP->overrideLargeSideEstimate( true ); //loop through returnedcols to find out constant columns CalpontSelectExecutionPlan::ReturnedColumnList returnedCols = updateCP->returnedCols(); @@ -1847,7 +1833,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) { returnedCols.erase(coliter); coliter = returnedCols.begin(); - //cout << "constant column " << endl; } else coliter++; @@ -1929,16 +1914,14 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) ptsub->walk(makeUpdateSemiJoin, &tbList[0] ); } - //cout<< "Plan is " << endl << *updateCP << endl; if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ) thd->lex->first_select_lex()->item_list = items; } - //cout<< "Plan is " << endl << *updateCP << endl; } - //updateCP->traceFlags(1); //cout<< "Plan is " << endl << *updateCP << endl; + //updateCP->traceFlags(1); pDMLPackage->HasFilter(true); pDMLPackage->uuid(updateCP->uuid()); @@ -1947,7 +1930,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) boost::shared_ptr plan = pDMLPackage->get_ExecutionPlan(); updateCP->rmParms(ci->rmParms); updateCP->serialize(*plan); - //cout << "plan has bytes " << plan->length() << endl; pDMLPackage->write(bytestream); delete pDMLPackage; @@ -2203,8 +2185,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) //@Bug 2540. Set error status instead of warning thd->raise_error_printf(ER_INTERNAL_ERROR, errorMsg.c_str()); ci->rc = b; - thd->set_row_count_func(0); - thd->get_stmt_da()->set_overwrite_status(true); + // WIP + //thd->get_stmt_da()->set_overwrite_status(true); //cout << " error status " << ci->rc << endl; } @@ -2219,7 +2201,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) } else { - thd->set_row_count_func(dmlRowCount+thd->get_row_count_func()); + if (affected_rows) + *affected_rows = dmlRowCount; } push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str()); @@ -2227,19 +2210,11 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) else { // if (dmlRowCount != 0) //Bug 5117. Handling self join. - thd->set_row_count_func(dmlRowCount+thd->get_row_count_func()); - - + if (affected_rows) + *affected_rows = dmlRowCount; //cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl; } - // @bug 4027. comment out the following because this will cause mysql - // kernel assertion failure. not sure why this is here in the first place. -// if (thd->derived_tables) -// { -// thd->get_stmt_da()->set_overwrite_status(true); -// thd->get_stmt_da()->set_ok_status( thd, dmlRowCount, 0, ""); -// } // insert query stats ci->stats.setEndTime(); @@ -2291,6 +2266,16 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name) return 0; } +int ha_mcs_impl_direct_update_delete_rows(ha_rows *affected_rows) +{ + THD* thd = current_thd; + cal_impl_if::gp_walk_info gwi; + gwi.thd = thd; + + int rc = doUpdateDelete(thd, gwi, affected_rows); + return rc; +} + int ha_mcs_impl_rnd_init(TABLE* table) { IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl ); @@ -2350,7 +2335,7 @@ int ha_mcs_impl_rnd_init(TABLE* table) //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, NULL); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); @@ -4080,6 +4065,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type) && (mapiter->second.condInfo && mapiter->second.csep) && lock_type == 2) { + // CS ends up processing query with handlers // table mode if (mapiter->second.conn_hndl) { @@ -4125,6 +4111,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type) ci->physTablesList.erase(table); } + // CS ends up processing query with handlers if (iter != ci->physTablesList.end() && ci->physTablesList.empty()) { if (!ci->cal_conn_hndl) @@ -4933,7 +4920,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, NULL); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 48322ff27..692983d5b 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -42,6 +42,7 @@ extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd); extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table); extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_mcs_impl_update_row(); +extern int ha_mcs_impl_direct_update_delete_rows(ha_rows *affected_rows); extern int ha_mcs_impl_delete_row(); extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 8e2ec327a..1bb9f40c9 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -1,5 +1,6 @@ /* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation + Copyright (C) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -16,8 +17,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -// $Id: ha_mcs_impl_if.h 9701 2013-07-17 18:51:55Z zzhu $ -/** @file */ #ifndef HA_MCS_IMPL_IF_H__ #define HA_MCS_IMPL_IF_H__ #include