From afb638b9bd32e8d343e88fe4db5a64a4df35d689 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Tue, 3 Aug 2021 22:49:22 +0000 Subject: [PATCH] MCOL-4805 For functions in the plugin code that disable replication on the slave threads, we now check for this condition early on in the function block. --- dbcon/mysql/ha_mcs_ddl.cpp | 20 ++++--- dbcon/mysql/ha_mcs_dml.cpp | 4 +- dbcon/mysql/ha_mcs_impl.cpp | 108 +++++++++++++++++++----------------- 3 files changed, 69 insertions(+), 63 deletions(-) diff --git a/dbcon/mysql/ha_mcs_ddl.cpp b/dbcon/mysql/ha_mcs_ddl.cpp index c6d56757b..06c215084 100644 --- a/dbcon/mysql/ha_mcs_ddl.cpp +++ b/dbcon/mysql/ha_mcs_ddl.cpp @@ -2215,6 +2215,9 @@ int ha_mcs_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* crea #endif THD* thd = current_thd; + if (thd->slave_thread && !get_replication_slave(thd)) + return 0; + char* query = thd->query(); if (!query) @@ -2313,9 +2316,6 @@ int ha_mcs_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* crea if ( schemaSyncOnly && isCreate) return rc; - if (thd->slave_thread && !get_replication_slave(thd)) - return rc; - //@bug 5660. Error out REAL DDL/DML on slave node. // When the statement gets here, it's NOT SSO or RESTRICT if (ci.isSlaveNode) @@ -2529,6 +2529,10 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i cout << "ha_mcs_impl_delete_table: " << db << name << endl; #endif THD* thd = current_thd; + + if (thd->slave_thread && !get_replication_slave(thd)) + return 0; + char* query = thd->query(); if (!query) @@ -2548,9 +2552,6 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i return 0; } - if (thd->slave_thread && !get_replication_slave(thd)) - return 0; - //@bug 5660. Error out REAL DDL/DML on slave node. // When the statement gets here, it's NOT SSO or RESTRICT if (ci.isSlaveNode) @@ -2588,6 +2589,10 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_info& ci) { THD* thd = current_thd; + + if (thd->slave_thread && !get_replication_slave(thd)) + return 0; + string emsg; string tblFrom (from+2); @@ -2602,9 +2607,6 @@ int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_i string stmt; - if (thd->slave_thread && !get_replication_slave(thd)) - return 0; - // This is a temporary table rename, we don't use the temporary table name // so this is a NULL op if (tblFrom.compare(0, 4, "#sql") == 0) diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index fafdf3f79..7cdb55d0e 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -948,11 +948,11 @@ std::string ha_mcs_impl_cleartablelock( int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci ) { - int rc = 0; - if (thd->slave_thread && !get_replication_slave(thd)) return 0; + int rc = 0; + std::string command("COMMIT"); #ifdef INFINIDB_DEBUG cout << "COMMIT" << endl; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index dc7b5257f..747a7d46d 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -2191,6 +2191,17 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) gp_walk_info gwi; gwi.thd = thd; + if (thd->slave_thread && !get_replication_slave(thd) && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + //check whether the system is ready to process statement. #ifndef _MSC_VER static DBRM dbrm(true); @@ -2219,17 +2230,6 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - #if 0 if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0) { @@ -2571,8 +2571,6 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table) { THD* thd = current_thd; - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || @@ -2584,6 +2582,8 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return HA_ERR_END_OF_FILE; + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd))) return HA_ERR_END_OF_FILE; @@ -2656,10 +2656,6 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) { int rc = 0; THD* thd = current_thd; - cal_connection_info* ci = nullptr; - - if (get_fe_conn_info_ptr() != NULL) - ci = reinterpret_cast(get_fe_conn_info_ptr()); if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || @@ -2672,8 +2668,11 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) thd->lex->sql_command == SQLCOM_LOAD)) return 0; + cal_connection_info* ci = nullptr; + if (get_fe_conn_info_ptr() != NULL) ci = reinterpret_cast(get_fe_conn_info_ptr()); + if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) return rc; @@ -2863,6 +2862,10 @@ int ha_mcs_impl_delete_table(const char* name) int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed) { THD* thd = current_thd; + + if (thd->slave_thread && !get_replication_slave(thd)) + return 0; + // Error out INSERT on VIEW. It's currently not supported. // @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off // for now - ZZ. @@ -2886,11 +2889,9 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed) if (rows_changed == 0) ci->tableValuesMap.clear(); - if (thd->slave_thread && !get_replication_slave(thd)) + if (ci->alterTableState > 0) return 0; - if (ci->alterTableState > 0) return 0; - ha_rows rowsInserted = 0; int rc = 0; @@ -2960,6 +2961,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins { THD* thd = current_thd; + if (thd->slave_thread && !get_replication_slave(thd)) + return; + if (get_fe_conn_info_ptr() == nullptr) set_fe_conn_info_ptr((void*)new cal_connection_info()); @@ -2968,9 +2972,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins // clear rows variable ci->rowsHaveInserted = 0; - if (ci->alterTableState > 0) return; - - if (thd->slave_thread && !get_replication_slave(thd)) + if (ci->alterTableState > 0) return; //@bug 5660. Error out DDL/DML on slave node, or on local query node @@ -3504,6 +3506,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) bitmap_clear_all(table->read_set); THD* thd = current_thd; + if (thd->slave_thread && !get_replication_slave(thd)) + return 0; + std::string aTmpDir(startup::StartUp::tmpDir()); if (get_fe_conn_info_ptr() == nullptr) @@ -3511,9 +3516,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !get_replication_slave(thd)) - return 0; - int rc = 0; if (ci->rc == 5) //read only dbrm @@ -4527,14 +4529,6 @@ int ha_mcs_impl_group_by_next(TABLE* table) { THD* thd = current_thd; - if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd))) - return HA_ERR_END_OF_FILE; - - if (get_fe_conn_info_ptr() == nullptr) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || @@ -4546,6 +4540,14 @@ int ha_mcs_impl_group_by_next(TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return HA_ERR_END_OF_FILE; + if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd))) + return HA_ERR_END_OF_FILE; + + if (get_fe_conn_info_ptr() == nullptr) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { force_close_fep_conn(thd, ci); @@ -4608,7 +4610,6 @@ int ha_mcs_impl_group_by_end(TABLE* table) { int rc = 0; THD* thd = current_thd; - cal_connection_info* ci = nullptr; if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || @@ -4621,6 +4622,8 @@ int ha_mcs_impl_group_by_end(TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; + cal_connection_info* ci = nullptr; + if (get_fe_conn_info_ptr() != NULL) ci = reinterpret_cast(get_fe_conn_info_ptr()); @@ -4783,6 +4786,17 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) IDEBUG( cout << "pushdown_init for table " << endl ); THD* thd = current_thd; + if (thd->slave_thread && !get_replication_slave(thd) && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + gp_walk_info gwi; gwi.thd = thd; bool err = false; @@ -4818,17 +4832,6 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) return 0; } - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - // MCOL-4023 We need to test this code path. // Update and delete code if (isUpdateOrDeleteStatement(thd->lex->sql_command)) @@ -5232,14 +5235,8 @@ internal_error: int ha_mcs_impl_select_next(uchar* buf, TABLE* table) { - int rc = HA_ERR_END_OF_FILE; THD* thd = current_thd; - if (get_fe_conn_info_ptr() == nullptr) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || @@ -5251,6 +5248,13 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return HA_ERR_END_OF_FILE; + int rc = HA_ERR_END_OF_FILE; + + if (get_fe_conn_info_ptr() == nullptr) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + if (isUpdateOrDeleteStatement(thd->lex->sql_command)) return rc;