1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4936 Disable binlog for DML statements.

DML statements executed on the primary node in a ColumnStore
cluster do not need to be written to the primary's binlog. This
is due to ColumnStore's distributed storage architecture.

With this patch, we disable writing to binlog when a DML statement
(INSERT/DELETE/UPDATE/LDI/INSERT..SELECT) is performed on a ColumnStore
table. HANDLER::external_lock() calls are used to
  1. Turn OFF the OPTION_BIN_LOG flag
  2. Turn ON the OPTION_BIN_TMP_LOG_OFF flag
in THD::variables.option_bits during a WRITE lock call.

THD::variables.option_bits is restored back to the original state
during the UNLOCK call in HANDLER::external_lock().

Further, isDMLStatement() function is added to reduce code verbosity
to check if a given statement is a DML statement.

Note that with this patch, not writing to primary's binlog means
DML replication from a ColumnStore cluster to another ColumnStore
cluster or to another foreign engine will not work.
This commit is contained in:
Gagan Goel
2022-01-03 18:36:48 -05:00
parent edf404724c
commit 195425924d
4 changed files with 89 additions and 72 deletions

View File

@ -2152,15 +2152,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows,
gwi.thd = thd; gwi.thd = thd;
int rc = 0; int rc = 0;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
{ {
if (affected_rows) if (affected_rows)
*affected_rows = 0; *affected_rows = 0;
@ -2189,15 +2182,8 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
gp_walk_info gwi; gp_walk_info gwi;
gwi.thd = thd; gwi.thd = thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
//check whether the system is ready to process statement. //check whether the system is ready to process statement.
@ -2569,15 +2555,8 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table)
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
@ -2655,15 +2634,8 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
int rc = 0; int rc = 0;
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
cal_connection_info* ci = nullptr; cal_connection_info* ci = nullptr;
@ -3942,6 +3914,31 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condSt
return cond; return cond;
} }
inline void disableBinlogForDML(THD* thd)
{
if (isDMLStatement(thd->lex->sql_command) &&
(thd->variables.option_bits & OPTION_BIN_LOG))
{
set_original_option_bits(thd->variables.option_bits, thd);
thd->variables.option_bits &= ~OPTION_BIN_LOG;
thd->variables.option_bits |= OPTION_BIN_TMP_LOG_OFF;
}
}
inline void restoreBinlogForDML(THD* thd)
{
if (isDMLStatement(thd->lex->sql_command))
{
ulonglong orig_option_bits = get_original_option_bits(thd);
if (orig_option_bits)
{
thd->variables.option_bits = orig_option_bits;
set_original_option_bits(0, thd);
}
}
}
int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type) int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type)
{ {
// @bug 3014. Error out locking table command. IDB does not support it now. // @bug 3014. Error out locking table command. IDB does not support it now.
@ -4007,6 +4004,7 @@ int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type)
ci->physTablesList.erase(table); ci->physTablesList.erase(table);
thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD; thd->variables.in_subquery_conversion_threshold = IN_SUBQUERY_CONVERSION_THRESHOLD;
restore_optimizer_flags(thd); restore_optimizer_flags(thd);
restoreBinlogForDML(thd);
} }
else else
{ {
@ -4017,9 +4015,16 @@ int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type)
thd->variables.in_subquery_conversion_threshold=~ 0; thd->variables.in_subquery_conversion_threshold=~ 0;
// Early optimizer_switch changes to avoid unsupported opt-s. // Early optimizer_switch changes to avoid unsupported opt-s.
mutate_optimizer_flags(thd); mutate_optimizer_flags(thd);
// MCOL-4936 Disable binlog for DMLs
if (lock_type == 1)
{
disableBinlogForDML(thd);
}
} }
else if (lock_type == 2) else if (lock_type == 2)
{ {
restoreBinlogForDML(thd);
std::set<TABLE*>::iterator iter = ci->physTablesList.find(table); std::set<TABLE*>::iterator iter = ci->physTablesList.find(table);
if (iter != ci->physTablesList.end()) if (iter != ci->physTablesList.end())
{ {
@ -4527,15 +4532,8 @@ int ha_mcs_impl_group_by_next(TABLE* table)
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd)) if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
@ -4609,15 +4607,8 @@ int ha_mcs_impl_group_by_end(TABLE* table)
int rc = 0; int rc = 0;
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
cal_connection_info* ci = nullptr; cal_connection_info* ci = nullptr;
@ -4784,15 +4775,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
IDEBUG( cout << "pushdown_init for table " << endl ); IDEBUG( cout << "pushdown_init for table " << endl );
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
gp_walk_info gwi; gp_walk_info gwi;
@ -5235,15 +5219,8 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table)
{ {
THD* thd = current_thd; THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && ( if (thd->slave_thread && !get_replication_slave(thd) &&
thd->lex->sql_command == SQLCOM_INSERT || isDMLStatement(thd->lex->sql_command))
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
int rc = HA_ERR_END_OF_FILE; int rc = HA_ERR_END_OF_FILE;

View File

@ -444,6 +444,15 @@ inline bool isUpdateOrDeleteStatement(const enum_sql_command& command)
isDeleteStatement(command); isDeleteStatement(command);
} }
inline bool isDMLStatement(const enum_sql_command& command)
{
return (command == SQLCOM_INSERT ||
command == SQLCOM_INSERT_SELECT ||
command == SQLCOM_TRUNCATE ||
command == SQLCOM_LOAD ||
isUpdateOrDeleteStatement(command));
}
#ifdef DEBUG_WALK_COND #ifdef DEBUG_WALK_COND
void debug_walk(const Item* item, void* arg); void debug_walk(const Item* item, void* arg);
#endif #endif

View File

@ -77,6 +77,18 @@ static MYSQL_THDVAR_ULONGLONG(
1 1
); );
static MYSQL_THDVAR_ULONGLONG(
original_option_bits,
PLUGIN_VAR_NOSYSVAR | PLUGIN_VAR_NOCMDOPT,
"Storage for thd->variables.option_bits. For internal usage.",
NULL,
NULL,
0,
0,
~0U,
1
);
const char* mcs_select_handler_mode_values[] = { const char* mcs_select_handler_mode_values[] = {
"OFF", "OFF",
"ON", "ON",
@ -377,6 +389,7 @@ st_mysql_sys_var* mcs_system_variables[] =
MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(fe_conn_info_ptr), MYSQL_SYSVAR(fe_conn_info_ptr),
MYSQL_SYSVAR(original_optimizer_flags), MYSQL_SYSVAR(original_optimizer_flags),
MYSQL_SYSVAR(original_option_bits),
MYSQL_SYSVAR(select_handler), MYSQL_SYSVAR(select_handler),
MYSQL_SYSVAR(derived_handler), MYSQL_SYSVAR(derived_handler),
MYSQL_SYSVAR(group_by_handler), MYSQL_SYSVAR(group_by_handler),
@ -443,6 +456,21 @@ void set_original_optimizer_flags(ulonglong ptr, THD* thd)
THDVAR(current_thd, original_optimizer_flags) = (uint64_t)(ptr); THDVAR(current_thd, original_optimizer_flags) = (uint64_t)(ptr);
} }
ulonglong get_original_option_bits(THD* thd)
{
return (thd == NULL) ? 0 : THDVAR(thd, original_option_bits);
}
void set_original_option_bits(ulonglong value, THD* thd)
{
if (thd == NULL)
{
return;
}
THDVAR(thd, original_option_bits) = (uint64_t)(value);
}
mcs_select_handler_mode_t get_select_handler_mode(THD* thd) mcs_select_handler_mode_t get_select_handler_mode(THD* thd)
{ {
return ( thd == NULL ) ? mcs_select_handler_mode_t::ON : return ( thd == NULL ) ? mcs_select_handler_mode_t::ON :

View File

@ -64,6 +64,9 @@ void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL);
ulonglong get_original_optimizer_flags(THD* thd = NULL); ulonglong get_original_optimizer_flags(THD* thd = NULL);
void set_original_optimizer_flags(ulonglong ptr, THD* thd = NULL); void set_original_optimizer_flags(ulonglong ptr, THD* thd = NULL);
ulonglong get_original_option_bits(THD* thd = NULL);
void set_original_option_bits(ulonglong value, THD* thd = NULL);
mcs_select_handler_mode_t get_select_handler_mode(THD* thd); mcs_select_handler_mode_t get_select_handler_mode(THD* thd);
void set_select_handler_mode(THD* thd, ulong value); void set_select_handler_mode(THD* thd, ulong value);