1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-02 17:22:27 +03:00

MCOL-4805 For functions in the plugin code that disable replication on the

slave threads, we now check for this condition early on in the function
block.
This commit is contained in:
Gagan Goel
2021-08-03 22:49:22 +00:00
parent c16b0f6ad7
commit afb638b9bd
3 changed files with 69 additions and 63 deletions

View File

@ -2215,6 +2215,9 @@ int ha_mcs_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* crea
#endif #endif
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
char* query = thd->query(); char* query = thd->query();
if (!query) if (!query)
@ -2313,9 +2316,6 @@ int ha_mcs_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* crea
if ( schemaSyncOnly && isCreate) if ( schemaSyncOnly && isCreate)
return rc; return rc;
if (thd->slave_thread && !get_replication_slave(thd))
return rc;
//@bug 5660. Error out REAL DDL/DML on slave node. //@bug 5660. Error out REAL DDL/DML on slave node.
// When the statement gets here, it's NOT SSO or RESTRICT // When the statement gets here, it's NOT SSO or RESTRICT
if (ci.isSlaveNode) if (ci.isSlaveNode)
@ -2529,6 +2529,10 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i
cout << "ha_mcs_impl_delete_table: " << db << name << endl; cout << "ha_mcs_impl_delete_table: " << db << name << endl;
#endif #endif
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
char* query = thd->query(); char* query = thd->query();
if (!query) if (!query)
@ -2548,9 +2552,6 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i
return 0; return 0;
} }
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
//@bug 5660. Error out REAL DDL/DML on slave node. //@bug 5660. Error out REAL DDL/DML on slave node.
// When the statement gets here, it's NOT SSO or RESTRICT // When the statement gets here, it's NOT SSO or RESTRICT
if (ci.isSlaveNode) if (ci.isSlaveNode)
@ -2588,6 +2589,10 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i
int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_info& ci) int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_info& ci)
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
string emsg; string emsg;
string tblFrom (from+2); string tblFrom (from+2);
@ -2602,9 +2607,6 @@ int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_i
string stmt; string stmt;
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
// This is a temporary table rename, we don't use the temporary table name // This is a temporary table rename, we don't use the temporary table name
// so this is a NULL op // so this is a NULL op
if (tblFrom.compare(0, 4, "#sql") == 0) if (tblFrom.compare(0, 4, "#sql") == 0)

View File

@ -948,11 +948,11 @@ std::string ha_mcs_impl_cleartablelock(
int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci ) int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_info& ci )
{ {
int rc = 0;
if (thd->slave_thread && !get_replication_slave(thd)) if (thd->slave_thread && !get_replication_slave(thd))
return 0; return 0;
int rc = 0;
std::string command("COMMIT"); std::string command("COMMIT");
#ifdef INFINIDB_DEBUG #ifdef INFINIDB_DEBUG
cout << "COMMIT" << endl; cout << "COMMIT" << endl;

View File

@ -2191,6 +2191,17 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
gp_walk_info gwi; gp_walk_info gwi;
gwi.thd = thd; gwi.thd = thd;
if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
//check whether the system is ready to process statement. //check whether the system is ready to process statement.
#ifndef _MSC_VER #ifndef _MSC_VER
static DBRM dbrm(true); static DBRM dbrm(true);
@ -2219,17 +2230,6 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
#if 0 #if 0
if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0) if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
{ {
@ -2571,8 +2571,6 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table)
{ {
THD* thd = current_thd; THD* thd = current_thd;
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
@ -2584,6 +2582,8 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd))) if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd)))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
@ -2656,10 +2656,6 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
{ {
int rc = 0; int rc = 0;
THD* thd = current_thd; THD* thd = current_thd;
cal_connection_info* ci = nullptr;
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
@ -2672,8 +2668,11 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
cal_connection_info* ci = nullptr;
if (get_fe_conn_info_ptr() != NULL) if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
return rc; return rc;
@ -2863,6 +2862,10 @@ int ha_mcs_impl_delete_table(const char* name)
int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed) int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
// Error out INSERT on VIEW. It's currently not supported. // Error out INSERT on VIEW. It's currently not supported.
// @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off // @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off
// for now - ZZ. // for now - ZZ.
@ -2886,11 +2889,9 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
if (rows_changed == 0) if (rows_changed == 0)
ci->tableValuesMap.clear(); ci->tableValuesMap.clear();
if (thd->slave_thread && !get_replication_slave(thd)) if (ci->alterTableState > 0)
return 0; return 0;
if (ci->alterTableState > 0) return 0;
ha_rows rowsInserted = 0; ha_rows rowsInserted = 0;
int rc = 0; int rc = 0;
@ -2960,6 +2961,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd))
return;
if (get_fe_conn_info_ptr() == nullptr) if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info()); set_fe_conn_info_ptr((void*)new cal_connection_info());
@ -2968,9 +2972,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
// clear rows variable // clear rows variable
ci->rowsHaveInserted = 0; ci->rowsHaveInserted = 0;
if (ci->alterTableState > 0) return; if (ci->alterTableState > 0)
if (thd->slave_thread && !get_replication_slave(thd))
return; return;
//@bug 5660. Error out DDL/DML on slave node, or on local query node //@bug 5660. Error out DDL/DML on slave node, or on local query node
@ -3504,6 +3506,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
bitmap_clear_all(table->read_set); bitmap_clear_all(table->read_set);
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
std::string aTmpDir(startup::StartUp::tmpDir()); std::string aTmpDir(startup::StartUp::tmpDir());
if (get_fe_conn_info_ptr() == nullptr) if (get_fe_conn_info_ptr() == nullptr)
@ -3511,9 +3516,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !get_replication_slave(thd))
return 0;
int rc = 0; int rc = 0;
if (ci->rc == 5) //read only dbrm if (ci->rc == 5) //read only dbrm
@ -4527,14 +4529,6 @@ int ha_mcs_impl_group_by_next(TABLE* table)
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd)))
return HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
@ -4546,6 +4540,14 @@ int ha_mcs_impl_group_by_next(TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
if (isUpdateOrDeleteStatement(thd->lex->sql_command, !isForeignTableUpdate(thd)))
return HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{ {
force_close_fep_conn(thd, ci); force_close_fep_conn(thd, ci);
@ -4608,7 +4610,6 @@ int ha_mcs_impl_group_by_end(TABLE* table)
{ {
int rc = 0; int rc = 0;
THD* thd = current_thd; THD* thd = current_thd;
cal_connection_info* ci = nullptr;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
@ -4621,6 +4622,8 @@ int ha_mcs_impl_group_by_end(TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
cal_connection_info* ci = nullptr;
if (get_fe_conn_info_ptr() != NULL) if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
@ -4783,6 +4786,17 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
IDEBUG( cout << "pushdown_init for table " << endl ); IDEBUG( cout << "pushdown_init for table " << endl );
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
gp_walk_info gwi; gp_walk_info gwi;
gwi.thd = thd; gwi.thd = thd;
bool err = false; bool err = false;
@ -4818,17 +4832,6 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
return 0; return 0;
} }
if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
// MCOL-4023 We need to test this code path. // MCOL-4023 We need to test this code path.
// Update and delete code // Update and delete code
if (isUpdateOrDeleteStatement(thd->lex->sql_command)) if (isUpdateOrDeleteStatement(thd->lex->sql_command))
@ -5232,14 +5235,8 @@ internal_error:
int ha_mcs_impl_select_next(uchar* buf, TABLE* table) int ha_mcs_impl_select_next(uchar* buf, TABLE* table)
{ {
int rc = HA_ERR_END_OF_FILE;
THD* thd = current_thd; THD* thd = current_thd;
if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
@ -5251,6 +5248,13 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
int rc = HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (isUpdateOrDeleteStatement(thd->lex->sql_command)) if (isUpdateOrDeleteStatement(thd->lex->sql_command))
return rc; return rc;