diff --git a/dbcon/mysql/ha_calpont_ddl.cpp b/dbcon/mysql/ha_calpont_ddl.cpp index 20398fe3d..0f0f24a1b 100644 --- a/dbcon/mysql/ha_calpont_ddl.cpp +++ b/dbcon/mysql/ha_calpont_ddl.cpp @@ -2078,8 +2078,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* if ( schemaSyncOnly && isCreate) return rc; - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return rc; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2277,8 +2276,7 @@ int ha_calpont_impl_delete_table_(const char* db, const char* name, cal_connecti return 0; } - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2417,8 +2415,7 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti pair toPair; string stmt; - //this is replicated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index 8f5a5f21c..0e2bce76c 100644 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -2078,7 +2078,8 @@ int ha_calpont_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connectio thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE ) return rc; - if (thd->slave_thread) return 0; + if (thd->slave_thread && !ci.replicationEnabled) + return 0; std::string command("COMMIT"); #ifdef INFINIDB_DEBUG diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 52c270cf0..7dffcd36f 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -925,7 +925,6 @@ uint32_t doUpdateDelete(THD* thd) cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && !thd->slave_thread) { string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE); @@ -947,7 +946,14 @@ uint32_t doUpdateDelete(THD* thd) // stats start ci->stats.reset(); ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -2871,8 +2877,9 @@ int ha_calpont_impl_rnd_init(TABLE* table) // prevent "create table as select" from running on slave thd->infinidb_vtable.hasInfiniDBTable = true; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2925,8 +2932,6 @@ int ha_calpont_impl_rnd_init(TABLE* table) if (!thd->infinidb_vtable.cal_conn_info) thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - idbassert(ci != 0); // MySQL sometimes calls rnd_init multiple times, plan should only be @@ -3057,7 +3062,14 @@ int ha_calpont_impl_rnd_init(TABLE* table) { ci->stats.reset(); // reset query stats ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -3422,8 +3434,9 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) { THD* thd = current_thd; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -3434,7 +3447,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) return ER_INTERNAL_ERROR; @@ -3463,8 +3475,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) if (!thd->infinidb_vtable.cal_conn_info) thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -3547,8 +3557,17 @@ int ha_calpont_impl_rnd_end(TABLE* table) int rc = 0; THD* thd = current_thd; cal_connection_info* ci = NULL; + bool replicationEnabled = false; - if (thd->slave_thread && ( + if (thd->infinidb_vtable.cal_conn_info) + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (ci && ci->replicationEnabled) + { + replicationEnabled = true; + } + + if (thd->slave_thread && !replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -3561,9 +3580,6 @@ int ha_calpont_impl_rnd_end(TABLE* table) thd->infinidb_vtable.isNewQuery = true; - if (thd->infinidb_vtable.cal_conn_info) - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) { thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; // flip back to normal state @@ -3861,9 +3877,8 @@ int ha_calpont_impl_write_row(uchar* buf, TABLE* table) cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - if (thd->slave_thread) return 0; - - + if (thd->slave_thread && !ci->replicationEnabled) + return 0; if (ci->alterTableState > 0) return 0; @@ -3948,7 +3963,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE) thd->infinidb_vtable.isInfiniDBDML = true; - if (thd->slave_thread) return; + if (thd->slave_thread && !ci->replicationEnabled) + return; //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE) @@ -4422,7 +4438,14 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -4508,7 +4531,8 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - if (thd->slave_thread) return 0; + if (thd->slave_thread && !ci->replicationEnabled) + return 0; int rc = 0; @@ -5209,7 +5233,14 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE { ci->stats.reset(); // reset query stats ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -5628,19 +5659,6 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE { THD* thd = current_thd; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) return ER_INTERNAL_ERROR; @@ -5666,6 +5684,17 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + if (thd->slave_thread && !ci->replicationEnabled && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -5750,18 +5779,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* THD* thd = current_thd; cal_connection_info* ci = NULL; - - if (thd->slave_thread && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - thd->infinidb_vtable.isNewQuery = true; thd->infinidb_vtable.isUnion = false; @@ -5775,6 +5792,23 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* // return rc; //} + if (!ci) + { + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + } + + if (thd->slave_thread && !ci->replicationEnabled && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) { @@ -5801,12 +5835,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* } } - if (!ci) - { - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - } - // @bug 3078. Also session limit variable works the same as ctrl+c if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD || ((thd->lex)->sql_command != SQLCOM_INSERT && diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 72579111b..f107bac06 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -251,10 +251,18 @@ struct cal_connection_info useXbit(false), utf8(false), useCpimport(1), - delimiter('\7') + delimiter('\7'), + replicationEnabled(false) { // check if this is a slave mysql daemon isSlaveNode = checkSlave(); + + std::string option = config::Config::makeConfig()->getConfig("SystemConfig", "ReplicationEnabled"); + + if (!option.compare("Y")) + { + replicationEnabled = true; + } } static bool checkSlave() @@ -319,6 +327,7 @@ struct cal_connection_info char delimiter; char enclosed_by; std::vector columnTypes; + bool replicationEnabled; }; typedef std::tr1::unordered_map CalConnMap;