1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Replication improvements

This patch fixes:

MCOL-3557 - Row Based Replication events to ColumnStore tables will no
longer cause MariaDB to crash, it will error instead.

MCOL-3556 - Remove the Columnstore.xml variable to turn on ColumnStore
tables applying replication events and instead make it a system variable
that can be set in my.cnf called "columnstore_replication_slave". This
allows it to be set per-UM.
This commit is contained in:
Andrew Hutchings
2019-10-11 16:54:41 +01:00
parent 8476c81255
commit 20c1949152
7 changed files with 98 additions and 35 deletions

View File

@ -2360,7 +2360,7 @@ int ha_mcs_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* crea
if ( schemaSyncOnly && isCreate) if ( schemaSyncOnly && isCreate)
return rc; return rc;
if (thd->slave_thread && !ci.replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return rc; return rc;
//@bug 5660. Error out REAL DDL/DML on slave node. //@bug 5660. Error out REAL DDL/DML on slave node.
@ -2558,7 +2558,7 @@ int ha_mcs_impl_delete_table_(const char* db, const char* name, cal_connection_i
return 0; return 0;
} }
if (thd->slave_thread && !ci.replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return 0; return 0;
//@bug 5660. Error out REAL DDL/DML on slave node. //@bug 5660. Error out REAL DDL/DML on slave node.
@ -2697,7 +2697,7 @@ int ha_mcs_impl_rename_table_(const char* from, const char* to, cal_connection_i
pair<string, string> toPair; pair<string, string> toPair;
string stmt; string stmt;
if (thd->slave_thread && !ci.replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return 0; return 0;
//@bug 5660. Error out REAL DDL/DML on slave node. //@bug 5660. Error out REAL DDL/DML on slave node.

View File

@ -159,7 +159,17 @@ int ProcessCommandStatement(THD* thd, string& dmlStatement, cal_connection_info&
//@Bug 2721 and 2722. Log the statement before issuing commit/rollback //@Bug 2721 and 2722. Log the statement before issuing commit/rollback
if ( dmlStatement == "LOGGING" ) if ( dmlStatement == "LOGGING" )
{ {
VendorDMLStatement cmdStmt(idb_mysql_query_str(thd), DML_COMMAND, sessionID); char* query_char = idb_mysql_query_str(thd);
std::string query_str;
if (!query_char)
{
query_str = "<Replication event>";
}
else
{
query_str = query_char;
}
VendorDMLStatement cmdStmt(query_str, DML_COMMAND, sessionID);
cmdStmt.set_Logging( false ); cmdStmt.set_Logging( false );
pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
pDMLPackage->set_Logging( false ); pDMLPackage->set_Logging( false );
@ -250,7 +260,18 @@ int doProcessInsertValues ( TABLE* table, uint32_t size, cal_connection_info& ci
int rc = 0; int rc = 0;
VendorDMLStatement dmlStmts(idb_mysql_query_str(thd), DML_INSERT, table->s->table_name.str, char* query_char = idb_mysql_query_str(thd);
std::string query_str;
if (!query_char)
{
query_str = "<Replication event>";
}
else
{
query_str = query_char;
}
VendorDMLStatement dmlStmts(query_str, DML_INSERT, table->s->table_name.str,
table->s->db.str, size, ci.colNameList.size(), ci.colNameList, table->s->db.str, size, ci.colNameList.size(), ci.colNameList,
ci.tableValuesMap, ci.nullValuesBitset, sessionID); ci.tableValuesMap, ci.nullValuesBitset, sessionID);
@ -2009,7 +2030,7 @@ int ha_mcs_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connection_in
{ {
int rc = 0; int rc = 0;
if (thd->slave_thread && !ci.replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return 0; return 0;
std::string command("COMMIT"); std::string command("COMMIT");

View File

@ -1255,6 +1255,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
args.add("Update"); args.add("Update");
else if (thd->get_command() == COM_SLAVE_SQL)
args.add("Row based replication event");
else else
args.add("Delete"); args.add("Delete");
@ -1282,7 +1284,16 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
} }
// @bug 1127. Re-construct update stmt using lex instead of using the original query. // @bug 1127. Re-construct update stmt using lex instead of using the original query.
string dmlStmt = string(idb_mysql_query_str(thd)); char* query_char = idb_mysql_query_str(thd);
std::string dmlStmt;
if (!query_char)
{
dmlStmt = "<Replication event>";
}
else
{
dmlStmt = query_char;
}
string schemaName; string schemaName;
string tableName(""); string tableName("");
string aliasName(""); string aliasName("");
@ -1555,6 +1566,12 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
} }
} }
} }
else if (thd->get_command() == COM_SLAVE_SQL)
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT);
setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
return ER_CHECK_NOT_IMPLEMENTED;
}
else else
{ {
updateCP->queryType(CalpontSelectExecutionPlan::DELETE); updateCP->queryType(CalpontSelectExecutionPlan::DELETE);
@ -1684,7 +1701,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
pDMLPackage->set_IsFromCol( true ); pDMLPackage->set_IsFromCol( true );
//cout << " setting isFromCol to " << isFromCol << endl; //cout << " setting isFromCol to " << isFromCol << endl;
string origStmt(idb_mysql_query_str(thd)); std::string origStmt = dmlStmt;
origStmt += ";"; origStmt += ";";
pDMLPackage->set_SQLStatement( origStmt ); pDMLPackage->set_SQLStatement( origStmt );
@ -1733,7 +1750,17 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
updateCP->txnID(txnID.id); updateCP->txnID(txnID.id);
updateCP->verID(verID); updateCP->verID(verID);
updateCP->sessionID(sessionID); updateCP->sessionID(sessionID);
updateCP->data(idb_mysql_query_str(thd)); char* query_char = idb_mysql_query_str(thd);
std::string query_str;
if (!query_char)
{
query_str = "<Replication event>";
}
else
{
query_str = query_char;
}
updateCP->data(query_str);
try try
{ {
@ -2300,7 +2327,7 @@ int ha_mcs_impl_rnd_init(TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE ||
@ -2311,6 +2338,13 @@ int ha_mcs_impl_rnd_init(TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
if (thd->slave_thread && thd->get_command() == COM_SLAVE_SQL)
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_RBR_EVENT);
setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
return ER_CHECK_NOT_IMPLEMENTED;
}
if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
return 0; return 0;
@ -2632,7 +2666,7 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE ||
@ -2718,17 +2752,11 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
int rc = 0; int rc = 0;
THD* thd = current_thd; THD* thd = current_thd;
cal_connection_info* ci = NULL; cal_connection_info* ci = NULL;
bool replicationEnabled = false;
if (get_fe_conn_info_ptr() != NULL) if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (ci && ci->replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd) && (
{
replicationEnabled = true;
}
if (thd->slave_thread && !replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE ||
@ -2966,7 +2994,7 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return 0; return 0;
if (ci->alterTableState > 0) return 0; if (ci->alterTableState > 0) return 0;
@ -3043,7 +3071,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
if (ci->alterTableState > 0) return; if (ci->alterTableState > 0) return;
if (thd->slave_thread && !ci->replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return; return;
//@bug 5660. Error out DDL/DML on slave node, or on local query node //@bug 5660. Error out DDL/DML on slave node, or on local query node
@ -3576,7 +3604,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled) if (thd->slave_thread && !get_replication_slave(thd))
return 0; return 0;
int rc = 0; int rc = 0;
@ -4612,7 +4640,7 @@ int ha_mcs_impl_group_by_next(TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE ||
@ -4688,7 +4716,7 @@ int ha_mcs_impl_group_by_end(TABLE* table)
THD* thd = current_thd; THD* thd = current_thd;
cal_connection_info* ci = NULL; cal_connection_info* ci = NULL;
if (thd->slave_thread && !ci->replicationEnabled && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE ||
@ -5306,7 +5334,7 @@ int ha_cs_impl_select_next(uchar* buf, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && ( if (thd->slave_thread && !get_replication_slave(thd) && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE || thd->lex->sql_command == SQLCOM_UPDATE ||

View File

@ -262,18 +262,10 @@ struct cal_connection_info
useXbit(false), useXbit(false),
utf8(false), utf8(false),
useCpimport(1), useCpimport(1),
delimiter('\7'), delimiter('\7')
replicationEnabled(false)
{ {
// check if this is a slave mysql daemon // check if this is a slave mysql daemon
isSlaveNode = checkSlave(); isSlaveNode = checkSlave();
std::string option = config::Config::makeConfig()->getConfig("SystemConfig", "ReplicationEnabled");
if (!option.compare("Y"))
{
replicationEnabled = true;
}
} }
static bool checkSlave() static bool checkSlave()
@ -339,7 +331,6 @@ struct cal_connection_info
char delimiter; char delimiter;
char enclosed_by; char enclosed_by;
std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes; std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes;
bool replicationEnabled;
// MCOL-1101 remove compilation unit variable rmParms // MCOL-1101 remove compilation unit variable rmParms
std::vector <execplan::RMParam> rmParms; std::vector <execplan::RMParam> rmParms;
}; };

View File

@ -276,6 +276,15 @@ static MYSQL_THDVAR_BOOL(
1 // default 1 // default
); );
static MYSQL_THDVAR_BOOL(
replication_slave,
PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
"Allow this MariaDB server to apply replication changes to ColumnStore",
NULL,
NULL,
0
);
st_mysql_sys_var* mcs_system_variables[] = st_mysql_sys_var* mcs_system_variables[] =
{ {
MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(compression_type),
@ -300,6 +309,7 @@ st_mysql_sys_var* mcs_system_variables[] =
MYSQL_SYSVAR(import_for_batchinsert_delimiter), MYSQL_SYSVAR(import_for_batchinsert_delimiter),
MYSQL_SYSVAR(import_for_batchinsert_enclosed_by), MYSQL_SYSVAR(import_for_batchinsert_enclosed_by),
MYSQL_SYSVAR(varbin_always_hex), MYSQL_SYSVAR(varbin_always_hex),
MYSQL_SYSVAR(replication_slave),
NULL NULL
}; };
@ -520,3 +530,12 @@ void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value)
{ {
THDVAR(thd, import_for_batchinsert_enclosed_by) = value; THDVAR(thd, import_for_batchinsert_enclosed_by) = value;
} }
bool get_replication_slave(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, replication_slave);
}
void set_replication_slave(THD* thd, bool value)
{
THDVAR(thd, replication_slave) = value;
}

View File

@ -102,5 +102,8 @@ void set_import_for_batchinsert_delimiter(THD* thd, ulong value);
ulong get_import_for_batchinsert_enclosed_by(THD* thd); ulong get_import_for_batchinsert_enclosed_by(THD* thd);
void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value); void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value);
bool get_replication_slave(THD* thd);
void set_replication_slave(THD* thd, bool value);
#endif #endif

View File

@ -143,6 +143,7 @@
4016 ERR_DML_DDL_SLAVE DML and DDL statements for Columnstore tables can only be run from the replication master. 4016 ERR_DML_DDL_SLAVE DML and DDL statements for Columnstore tables can only be run from the replication master.
4017 ERR_DML_DDL_LOCAL DML and DDL statements are not allowed when columnstore_local_query is greater than 0. 4017 ERR_DML_DDL_LOCAL DML and DDL statements are not allowed when columnstore_local_query is greater than 0.
4018 ERR_NON_SUPPORT_SYNTAX The statement is not supported in Columnstore. 4018 ERR_NON_SUPPORT_SYNTAX The statement is not supported in Columnstore.
4019 ERR_RBR_EVENT Row based replication events are not supported in Columnstore.
# UDF # UDF
5001 ERR_FUNC_NON_IMPLEMENT %1%:%2% is not implemented. 5001 ERR_FUNC_NON_IMPLEMENT %1%:%2% is not implemented.