diff --git a/dbcon/mysql/ha_mcs_ddl.cpp b/dbcon/mysql/ha_mcs_ddl.cpp index b13032269..61bb08a6a 100644 --- a/dbcon/mysql/ha_mcs_ddl.cpp +++ b/dbcon/mysql/ha_mcs_ddl.cpp @@ -2360,7 +2360,7 @@ int ha_mcs_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* crea if ( schemaSyncOnly && isCreate) return rc; - if (thd->slave_thread && !ci.replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return rc; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2558,7 +2558,7 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i return 0; } - if (thd->slave_thread && !ci.replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2697,7 +2697,7 @@ int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_i pair toPair; string stmt; - if (thd->slave_thread && !ci.replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index 0760a8e95..df3d2d206 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -159,7 +159,17 @@ int ProcessCommandStatement(THD* thd, string& dmlStatement, cal_connection_info& //@Bug 2721 and 2722. Log the statement before issuing commit/rollback if ( dmlStatement == "LOGGING" ) { - VendorDMLStatement cmdStmt(idb_mysql_query_str(thd), DML_COMMAND, sessionID); + char* query_char = idb_mysql_query_str(thd); + std::string query_str; + if (!query_char) + { + query_str = ""; + } + else + { + query_str = query_char; + } + VendorDMLStatement cmdStmt(query_str, DML_COMMAND, sessionID); cmdStmt.set_Logging( false ); pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); pDMLPackage->set_Logging( false ); @@ -250,7 +260,18 @@ int doProcessInsertValues ( TABLE* table, uint32_t size, cal_connection_info& ci int rc = 0; - VendorDMLStatement dmlStmts(idb_mysql_query_str(thd), DML_INSERT, table->s->table_name.str, + char* query_char = idb_mysql_query_str(thd); + std::string query_str; + if (!query_char) + { + query_str = ""; + } + else + { + query_str = query_char; + } + + VendorDMLStatement dmlStmts(query_str, DML_INSERT, table->s->table_name.str, table->s->db.str, size, ci.colNameList.size(), ci.colNameList, ci.tableValuesMap, ci.nullValuesBitset, sessionID); @@ -2009,7 +2030,7 @@ int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_in { int rc = 0; - if (thd->slave_thread && !ci.replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return 0; std::string command("COMMIT"); diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 9ecbdb22d..2ce3cb620 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -1255,6 +1255,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) args.add("Update"); + else if (thd->get_command() == COM_SLAVE_SQL) + args.add("Row based replication event"); else args.add("Delete"); @@ -1282,7 +1284,16 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) } // @bug 1127. Re-construct update stmt using lex instead of using the original query. - string dmlStmt = string(idb_mysql_query_str(thd)); + char* query_char = idb_mysql_query_str(thd); + std::string dmlStmt; + if (!query_char) + { + dmlStmt = ""; + } + else + { + dmlStmt = query_char; + } string schemaName; string tableName(""); string aliasName(""); @@ -1555,6 +1566,12 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) } } } + else if (thd->get_command() == COM_SLAVE_SQL) + { + string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT); + setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg); + return ER_CHECK_NOT_IMPLEMENTED; + } else { updateCP->queryType(CalpontSelectExecutionPlan::DELETE); @@ -1684,7 +1701,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) pDMLPackage->set_IsFromCol( true ); //cout << " setting isFromCol to " << isFromCol << endl; - string origStmt(idb_mysql_query_str(thd)); + std::string origStmt = dmlStmt; origStmt += ";"; pDMLPackage->set_SQLStatement( origStmt ); @@ -1733,7 +1750,17 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) updateCP->txnID(txnID.id); updateCP->verID(verID); updateCP->sessionID(sessionID); - updateCP->data(idb_mysql_query_str(thd)); + char* query_char = idb_mysql_query_str(thd); + std::string query_str; + if (!query_char) + { + query_str = ""; + } + else + { + query_str = query_char; + } + updateCP->data(query_str); try { @@ -2300,7 +2327,7 @@ int ha_mcs_impl_rnd_init(TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !ci->replicationEnabled && ( + if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2311,6 +2338,13 @@ int ha_mcs_impl_rnd_init(TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; + if (thd->slave_thread && thd->get_command() == COM_SLAVE_SQL) + { + string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT); + setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg); + return ER_CHECK_NOT_IMPLEMENTED; + } + if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) return 0; @@ -2632,7 +2666,7 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !ci->replicationEnabled && ( + if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2718,17 +2752,11 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) int rc = 0; THD* thd = current_thd; cal_connection_info* ci = NULL; - bool replicationEnabled = false; if (get_fe_conn_info_ptr() != NULL) ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (ci && ci->replicationEnabled) - { - replicationEnabled = true; - } - - if (thd->slave_thread && !replicationEnabled && ( + if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2966,7 +2994,7 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !ci->replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return 0; if (ci->alterTableState > 0) return 0; @@ -3043,7 +3071,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (ci->alterTableState > 0) return; - if (thd->slave_thread && !ci->replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return; //@bug 5660. Error out DDL/DML on slave node, or on local query node @@ -3576,7 +3604,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !ci->replicationEnabled) + if (thd->slave_thread && !get_replication_slave(thd)) return 0; int rc = 0; @@ -4612,7 +4640,7 @@ int ha_mcs_impl_group_by_next(TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !ci->replicationEnabled && ( + if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -4688,7 +4716,7 @@ int ha_mcs_impl_group_by_end(TABLE* table) THD* thd = current_thd; cal_connection_info* ci = NULL; - if (thd->slave_thread && !ci->replicationEnabled && ( + if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -5306,7 +5334,7 @@ int ha_cs_impl_select_next(uchar* buf, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread && !ci->replicationEnabled && ( + if (thd->slave_thread && !get_replication_slave(thd) && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index a9258f6fd..8e2ec327a 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -262,18 +262,10 @@ struct cal_connection_info useXbit(false), utf8(false), useCpimport(1), - delimiter('\7'), - replicationEnabled(false) + delimiter('\7') { // check if this is a slave mysql daemon isSlaveNode = checkSlave(); - - std::string option = config::Config::makeConfig()->getConfig("SystemConfig", "ReplicationEnabled"); - - if (!option.compare("Y")) - { - replicationEnabled = true; - } } static bool checkSlave() @@ -339,7 +331,6 @@ struct cal_connection_info char delimiter; char enclosed_by; std::vector columnTypes; - bool replicationEnabled; // MCOL-1101 remove compilation unit variable rmParms std::vector rmParms; }; diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index c9c7b7571..a0759d88c 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -276,6 +276,15 @@ static MYSQL_THDVAR_BOOL( 1 // default ); +static MYSQL_THDVAR_BOOL( + replication_slave, + PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY, + "Allow this MariaDB server to apply replication changes to ColumnStore", + NULL, + NULL, + 0 +); + st_mysql_sys_var* mcs_system_variables[] = { MYSQL_SYSVAR(compression_type), @@ -300,6 +309,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(import_for_batchinsert_delimiter), MYSQL_SYSVAR(import_for_batchinsert_enclosed_by), MYSQL_SYSVAR(varbin_always_hex), + MYSQL_SYSVAR(replication_slave), NULL }; @@ -520,3 +530,12 @@ void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value) { THDVAR(thd, import_for_batchinsert_enclosed_by) = value; } + +bool get_replication_slave(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, replication_slave); +} +void set_replication_slave(THD* thd, bool value) +{ + THDVAR(thd, replication_slave) = value; +} diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index ca6dcfdc1..e1c0d4731 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -102,5 +102,8 @@ void set_import_for_batchinsert_delimiter(THD* thd, ulong value); ulong get_import_for_batchinsert_enclosed_by(THD* thd); void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value); - + +bool get_replication_slave(THD* thd); +void set_replication_slave(THD* thd, bool value); + #endif diff --git a/utils/loggingcpp/ErrorMessage.txt b/utils/loggingcpp/ErrorMessage.txt index 814a51310..adec4cc88 100755 --- a/utils/loggingcpp/ErrorMessage.txt +++ b/utils/loggingcpp/ErrorMessage.txt @@ -143,6 +143,7 @@ 4016 ERR_DML_DDL_SLAVE DML and DDL statements for Columnstore tables can only be run from the replication master. 4017 ERR_DML_DDL_LOCAL DML and DDL statements are not allowed when columnstore_local_query is greater than 0. 4018 ERR_NON_SUPPORT_SYNTAX The statement is not supported in Columnstore. +4019 ERR_RBR_EVENT Row based replication events are not supported in Columnstore. # UDF 5001 ERR_FUNC_NON_IMPLEMENT %1%:%2% is not implemented.