1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-593 Add optional MariaDB replication support

This patch will allow MariaDB replication into UM1 when enabling the
following is added to the SystemConfig section of Columnstore.xml:

<ReplicationEnabled>Y</ReplicationEnabled>

The intended use case is to replication from an InnoDB MariaDB server
into ColumnStore. You would need to create the tables on the ColumnStore
slave as "ColumnStore" and the same tables in the master as InnoDB.

At the moment the use case is narrow and could be prone to problems so
this will use the hidden flag until we can improve it.
This commit is contained in:
Andrew Hutchings
2019-04-15 14:45:34 +01:00
parent 67b42acc9c
commit f9f966fe96
4 changed files with 97 additions and 62 deletions

View File

@ -925,7 +925,6 @@ uint32_t doUpdateDelete(THD* thd)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
//@bug 5660. Error out DDL/DML on slave node, or on local query node
if (ci->isSlaveNode && !thd->slave_thread)
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
@ -947,7 +946,14 @@ uint32_t doUpdateDelete(THD* thd)
// stats start
ci->stats.reset();
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@ -2871,8 +2877,9 @@ int ha_calpont_impl_rnd_init(TABLE* table)
// prevent "create table as select" from running on slave
thd->infinidb_vtable.hasInfiniDBTable = true;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@ -2925,8 +2932,6 @@ int ha_calpont_impl_rnd_init(TABLE* table)
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
idbassert(ci != 0);
// MySQL sometimes calls rnd_init multiple times, plan should only be
@ -3057,7 +3062,14 @@ int ha_calpont_impl_rnd_init(TABLE* table)
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@ -3422,8 +3434,9 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
{
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@ -3434,7 +3447,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
@ -3463,8 +3475,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
@ -3547,8 +3557,17 @@ int ha_calpont_impl_rnd_end(TABLE* table)
int rc = 0;
THD* thd = current_thd;
cal_connection_info* ci = NULL;
bool replicationEnabled = false;
if (thd->slave_thread && (
if (thd->infinidb_vtable.cal_conn_info)
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (ci && ci->replicationEnabled)
{
replicationEnabled = true;
}
if (thd->slave_thread && !replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@ -3561,9 +3580,6 @@ int ha_calpont_impl_rnd_end(TABLE* table)
thd->infinidb_vtable.isNewQuery = true;
if (thd->infinidb_vtable.cal_conn_info)
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
{
thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; // flip back to normal state
@ -3861,9 +3877,8 @@ int ha_calpont_impl_write_row(uchar* buf, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (thd->slave_thread) return 0;
if (thd->slave_thread && !ci->replicationEnabled)
return 0;
if (ci->alterTableState > 0) return 0;
@ -3948,7 +3963,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE)
thd->infinidb_vtable.isInfiniDBDML = true;
if (thd->slave_thread) return;
if (thd->slave_thread && !ci->replicationEnabled)
return;
//@bug 5660. Error out DDL/DML on slave node, or on local query node
if (ci->isSlaveNode && thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE)
@ -4422,7 +4438,14 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
// query stats. only collect execution time and rows inserted for insert/load_data_infile
ci->stats.reset();
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@ -4508,7 +4531,8 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (thd->slave_thread) return 0;
if (thd->slave_thread && !ci->replicationEnabled)
return 0;
int rc = 0;
@ -5209,7 +5233,14 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@ -5628,19 +5659,6 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
{
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
@ -5666,6 +5684,17 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
@ -5750,18 +5779,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
THD* thd = current_thd;
cal_connection_info* ci = NULL;
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
thd->infinidb_vtable.isNewQuery = true;
thd->infinidb_vtable.isUnion = false;
@ -5775,6 +5792,23 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
// return rc;
//}
if (!ci)
{
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
}
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
{
@ -5801,12 +5835,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
}
}
if (!ci)
{
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
}
// @bug 3078. Also session limit variable works the same as ctrl+c
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD ||
((thd->lex)->sql_command != SQLCOM_INSERT &&