From f9f966fe962e36169f83d453ff66cad05b9e1908 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Mon, 15 Apr 2019 14:45:34 +0100 Subject: [PATCH] MCOL-593 Add optional MariaDB replication support This patch will allow MariaDB replication into UM1 when enabling the following is added to the SystemConfig section of Columnstore.xml: Y The intended use case is to replication from an InnoDB MariaDB server into ColumnStore. You would need to create the tables on the ColumnStore slave as "ColumnStore" and the same tables in the master as InnoDB. At the moment the use case is narrow and could be prone to problems so this will use the hidden flag until we can improve it. --- dbcon/mysql/ha_calpont_ddl.cpp | 9 +- dbcon/mysql/ha_calpont_dml.cpp | 3 +- dbcon/mysql/ha_calpont_impl.cpp | 136 +++++++++++++++++++------------ dbcon/mysql/ha_calpont_impl_if.h | 11 ++- 4 files changed, 97 insertions(+), 62 deletions(-) diff --git a/dbcon/mysql/ha_calpont_ddl.cpp b/dbcon/mysql/ha_calpont_ddl.cpp index 20398fe3d..0f0f24a1b 100644 --- a/dbcon/mysql/ha_calpont_ddl.cpp +++ b/dbcon/mysql/ha_calpont_ddl.cpp @@ -2078,8 +2078,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* if ( schemaSyncOnly && isCreate) return rc; - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return rc; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2277,8 +2276,7 @@ int ha_calpont_impl_delete_table_(const char* db, const char* name, cal_connecti return 0; } - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2417,8 +2415,7 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti pair toPair; string stmt; - //this is replicated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index 8f5a5f21c..0e2bce76c 100644 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -2078,7 +2078,8 @@ int ha_calpont_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connectio thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE ) return rc; - if (thd->slave_thread) return 0; + if (thd->slave_thread && !ci.replicationEnabled) + return 0; std::string command("COMMIT"); #ifdef INFINIDB_DEBUG diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 52c270cf0..7dffcd36f 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -925,7 +925,6 @@ uint32_t doUpdateDelete(THD* thd) cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && !thd->slave_thread) { string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE); @@ -947,7 +946,14 @@ uint32_t doUpdateDelete(THD* thd) // stats start ci->stats.reset(); ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -2871,8 +2877,9 @@ int ha_calpont_impl_rnd_init(TABLE* table) // prevent "create table as select" from running on slave thd->infinidb_vtable.hasInfiniDBTable = true; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2925,8 +2932,6 @@ int ha_calpont_impl_rnd_init(TABLE* table) if (!thd->infinidb_vtable.cal_conn_info) thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - idbassert(ci != 0); // MySQL sometimes calls rnd_init multiple times, plan should only be @@ -3057,7 +3062,14 @@ int ha_calpont_impl_rnd_init(TABLE* table) { ci->stats.reset(); // reset query stats ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -3422,8 +3434,9 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) { THD* thd = current_thd; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -3434,7 +3447,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) return ER_INTERNAL_ERROR; @@ -3463,8 +3475,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) if (!thd->infinidb_vtable.cal_conn_info) thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -3547,8 +3557,17 @@ int ha_calpont_impl_rnd_end(TABLE* table) int rc = 0; THD* thd = current_thd; cal_connection_info* ci = NULL; + bool replicationEnabled = false; - if (thd->slave_thread && ( + if (thd->infinidb_vtable.cal_conn_info) + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (ci && ci->replicationEnabled) + { + replicationEnabled = true; + } + + if (thd->slave_thread && !replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -3561,9 +3580,6 @@ int ha_calpont_impl_rnd_end(TABLE* table) thd->infinidb_vtable.isNewQuery = true; - if (thd->infinidb_vtable.cal_conn_info) - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) { thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; // flip back to normal state @@ -3861,9 +3877,8 @@ int ha_calpont_impl_write_row(uchar* buf, TABLE* table) cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - if (thd->slave_thread) return 0; - - + if (thd->slave_thread && !ci->replicationEnabled) + return 0; if (ci->alterTableState > 0) return 0; @@ -3948,7 +3963,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE) thd->infinidb_vtable.isInfiniDBDML = true; - if (thd->slave_thread) return; + if (thd->slave_thread && !ci->replicationEnabled) + return; //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE) @@ -4422,7 +4438,14 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -4508,7 +4531,8 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - if (thd->slave_thread) return 0; + if (thd->slave_thread && !ci->replicationEnabled) + return 0; int rc = 0; @@ -5209,7 +5233,14 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE { ci->stats.reset(); // reset query stats ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -5628,19 +5659,6 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE { THD* thd = current_thd; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) return ER_INTERNAL_ERROR; @@ -5666,6 +5684,17 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + if (thd->slave_thread && !ci->replicationEnabled && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -5750,18 +5779,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* THD* thd = current_thd; cal_connection_info* ci = NULL; - - if (thd->slave_thread && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - thd->infinidb_vtable.isNewQuery = true; thd->infinidb_vtable.isUnion = false; @@ -5775,6 +5792,23 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* // return rc; //} + if (!ci) + { + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + } + + if (thd->slave_thread && !ci->replicationEnabled && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) { @@ -5801,12 +5835,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* } } - if (!ci) - { - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - } - // @bug 3078. Also session limit variable works the same as ctrl+c if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD || ((thd->lex)->sql_command != SQLCOM_INSERT && diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 72579111b..f107bac06 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -251,10 +251,18 @@ struct cal_connection_info useXbit(false), utf8(false), useCpimport(1), - delimiter('\7') + delimiter('\7'), + replicationEnabled(false) { // check if this is a slave mysql daemon isSlaveNode = checkSlave(); + + std::string option = config::Config::makeConfig()->getConfig("SystemConfig", "ReplicationEnabled"); + + if (!option.compare("Y")) + { + replicationEnabled = true; + } } static bool checkSlave() @@ -319,6 +327,7 @@ struct cal_connection_info char delimiter; char enclosed_by; std::vector columnTypes; + bool replicationEnabled; }; typedef std::tr1::unordered_map CalConnMap;