From 195425924dba55a633e5f4a69eb190c69be32bf3 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Mon, 3 Jan 2022 18:36:48 -0500 Subject: [PATCH] MCOL-4936 Disable binlog for DML statements. DML statements executed on the primary node in a ColumnStore cluster do not need to be written to the primary's binlog. This is due to ColumnStore's distributed storage architecture. With this patch, we disable writing to binlog when a DML statement (INSERT/DELETE/UPDATE/LDI/INSERT..SELECT) is performed on a ColumnStore table. HANDLER::external_lock() calls are used to 1. Turn OFF the OPTION_BIN_LOG flag 2. Turn ON the OPTION_BIN_TMP_LOG_OFF flag in THD::variables.option_bits during a WRITE lock call. THD::variables.option_bits is restored back to the original state during the UNLOCK call in HANDLER::external_lock(). Further, isDMLStatement() function is added to reduce code verbosity to check if a given statement is a DML statement. Note that with this patch, not writing to primary's binlog means DML replication from a ColumnStore cluster to another ColumnStore cluster or to another foreign engine will not work. --- dbcon/mysql/ha_mcs_impl.cpp | 121 +++++++++++++-------------------- dbcon/mysql/ha_mcs_impl_if.h | 9 +++ dbcon/mysql/ha_mcs_sysvars.cpp | 28 ++++++++ dbcon/mysql/ha_mcs_sysvars.h | 3 + 4 files changed, 89 insertions(+), 72 deletions(-) diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 5661914bb..5b7de4c22 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -2152,15 +2152,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, gwi.thd = thd; int rc = 0; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) { if (affected_rows) *affected_rows = 0; @@ -2189,15 +2182,8 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) gp_walk_info gwi; gwi.thd = thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return 0; //check whether the system is ready to process statement. @@ -2569,15 +2555,8 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table) { THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return HA_ERR_END_OF_FILE; if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) @@ -2655,15 +2634,8 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) int rc = 0; THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return 0; cal_connection_info* ci = nullptr; @@ -3942,6 +3914,31 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt return cond; } +inline void disableBinlogForDML(THD* thd) +{ + if (isDMLStatement(thd->lex->sql_command) && + (thd->variables.option_bits & OPTION_BIN_LOG)) + { + set_original_option_bits(thd->variables.option_bits, thd); + thd->variables.option_bits &= ~OPTION_BIN_LOG; + thd->variables.option_bits |= OPTION_BIN_TMP_LOG_OFF; + } +} + +inline void restoreBinlogForDML(THD* thd) +{ + if (isDMLStatement(thd->lex->sql_command)) + { + ulonglong orig_option_bits = get_original_option_bits(thd); + + if (orig_option_bits) + { + thd->variables.option_bits = orig_option_bits; + set_original_option_bits(0, thd); + } + } +} + int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type) { // @bug 3014. Error out locking table command. IDB does not support it now. @@ -4007,6 +4004,7 @@ int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type) ci->physTablesList.erase(table); thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD; restore_optimizer_flags(thd); + restoreBinlogForDML(thd); } else { @@ -4017,9 +4015,16 @@ int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type) thd->variables.in_subquery_conversion_threshold=~ 0; // Early optimizer_switch changes to avoid unsupported opt-s. mutate_optimizer_flags(thd); + + // MCOL-4936 Disable binlog for DMLs + if (lock_type == 1) + { + disableBinlogForDML(thd); + } } else if (lock_type == 2) { + restoreBinlogForDML(thd); std::set::iterator iter = ci->physTablesList.find(table); if (iter != ci->physTablesList.end()) { @@ -4527,15 +4532,8 @@ int ha_mcs_impl_group_by_next(TABLE* table) { THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return HA_ERR_END_OF_FILE; if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) @@ -4609,15 +4607,8 @@ int ha_mcs_impl_group_by_end(TABLE* table) int rc = 0; THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return 0; cal_connection_info* ci = nullptr; @@ -4784,15 +4775,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) IDEBUG( cout << "pushdown_init for table " << endl ); THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return 0; gp_walk_info gwi; @@ -5235,15 +5219,8 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table) { THD* thd = current_thd; - if (thd->slave_thread && !get_replication_slave(thd) && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) + if (thd->slave_thread && !get_replication_slave(thd) && + isDMLStatement(thd->lex->sql_command)) return HA_ERR_END_OF_FILE; int rc = HA_ERR_END_OF_FILE; diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 20822a1ea..9d0a9c4e1 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -444,6 +444,15 @@ inline bool isUpdateOrDeleteStatement(const enum_sql_command& command) isDeleteStatement(command); } +inline bool isDMLStatement(const enum_sql_command& command) +{ + return (command == SQLCOM_INSERT || + command == SQLCOM_INSERT_SELECT || + command == SQLCOM_TRUNCATE || + command == SQLCOM_LOAD || + isUpdateOrDeleteStatement(command)); +} + #ifdef DEBUG_WALK_COND void debug_walk(const Item* item, void* arg); #endif diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 2a247405b..1562e487f 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -77,6 +77,18 @@ static MYSQL_THDVAR_ULONGLONG( 1 ); +static MYSQL_THDVAR_ULONGLONG( + original_option_bits, + PLUGIN_VAR_NOSYSVAR | PLUGIN_VAR_NOCMDOPT, + "Storage for thd->variables.option_bits. For internal usage.", + NULL, + NULL, + 0, + 0, + ~0U, + 1 +); + const char* mcs_select_handler_mode_values[] = { "OFF", "ON", @@ -377,6 +389,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(fe_conn_info_ptr), MYSQL_SYSVAR(original_optimizer_flags), + MYSQL_SYSVAR(original_option_bits), MYSQL_SYSVAR(select_handler), MYSQL_SYSVAR(derived_handler), MYSQL_SYSVAR(group_by_handler), @@ -443,6 +456,21 @@ void set_original_optimizer_flags(ulonglong ptr, THD* thd) THDVAR(current_thd, original_optimizer_flags) = (uint64_t)(ptr); } +ulonglong get_original_option_bits(THD* thd) +{ + return (thd == NULL) ? 0 : THDVAR(thd, original_option_bits); +} + +void set_original_option_bits(ulonglong value, THD* thd) +{ + if (thd == NULL) + { + return; + } + + THDVAR(thd, original_option_bits) = (uint64_t)(value); +} + mcs_select_handler_mode_t get_select_handler_mode(THD* thd) { return ( thd == NULL ) ? mcs_select_handler_mode_t::ON : diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 2ffba9fbd..f0d216860 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -64,6 +64,9 @@ void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL); ulonglong get_original_optimizer_flags(THD* thd = NULL); void set_original_optimizer_flags(ulonglong ptr, THD* thd = NULL); +ulonglong get_original_option_bits(THD* thd = NULL); +void set_original_option_bits(ulonglong value, THD* thd = NULL); + mcs_select_handler_mode_t get_select_handler_mode(THD* thd); void set_select_handler_mode(THD* thd, ulong value);