1
0
mirror of https://github.com/MariaDB/server.git synced 2025-10-12 12:25:37 +03:00

Merge branch '10.4' into bb-10.4-mdev16188

This commit is contained in:
Igor Babaev
2019-02-03 18:41:18 -08:00
2383 changed files with 85155 additions and 37192 deletions

View File

@@ -55,8 +55,12 @@
#include "semisync_master.h"
#include "wsrep_mysqld.h"
#include "wsrep.h"
#ifdef WITH_WSREP
#include "wsrep_binlog.h"
#include "wsrep_xid.h"
#include "wsrep_thd.h"
#include "wsrep_trans_observer.h" /* wsrep transaction hooks */
#endif /* WITH_WSREP */
/*
While we have legacy_db_type, we have this array to
@@ -252,6 +256,9 @@ handlerton *ha_checktype(THD *thd, handlerton *hton, bool no_substitute)
if (no_substitute)
return NULL;
#ifdef WITH_WSREP
(void)wsrep_after_rollback(thd, false);
#endif /* WITH_WSREP */
return ha_default_handlerton(thd);
} /* ha_checktype */
@@ -684,7 +691,7 @@ int ha_init()
binary log (which is considered a transaction-capable storage engine in
counting total_ha)
*/
opt_using_transactions= total_ha>(ulong)opt_bin_log;
opt_using_transactions= total_ha > (ulong) opt_bin_log;
savepoint_alloc_size+= sizeof(SAVEPOINT);
DBUG_RETURN(error);
}
@@ -694,7 +701,6 @@ int ha_end()
int error= 0;
DBUG_ENTER("ha_end");
/*
This should be eventualy based on the graceful shutdown flag.
So if flag is equal to HA_PANIC_CLOSE, the deallocate
@@ -824,6 +830,43 @@ void ha_kill_query(THD* thd, enum thd_kill_levels level)
}
/*****************************************************************************
Backup functions
******************************************************************************/
static my_bool plugin_prepare_for_backup(THD *unused1, plugin_ref plugin,
void *not_used)
{
handlerton *hton= plugin_hton(plugin);
if (hton->state == SHOW_OPTION_YES && hton->prepare_for_backup)
hton->prepare_for_backup();
return FALSE;
}
void ha_prepare_for_backup()
{
plugin_foreach_with_mask(0, plugin_prepare_for_backup,
MYSQL_STORAGE_ENGINE_PLUGIN,
PLUGIN_IS_DELETED|PLUGIN_IS_READY, 0);
}
static my_bool plugin_end_backup(THD *unused1, plugin_ref plugin,
void *not_used)
{
handlerton *hton= plugin_hton(plugin);
if (hton->state == SHOW_OPTION_YES && hton->end_backup)
hton->end_backup();
return FALSE;
}
void ha_end_backup()
{
plugin_foreach_with_mask(0, plugin_end_backup,
MYSQL_STORAGE_ENGINE_PLUGIN,
PLUGIN_IS_DELETED|PLUGIN_IS_READY, 0);
}
/* ========================================================================
======================= TRANSACTIONS ===================================*/
@@ -1164,17 +1207,28 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
static int prepare_or_error(handlerton *ht, THD *thd, bool all)
{
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION &&
wsrep_before_prepare(thd, all))
{
return(1);
}
#endif /* WITH_WSREP */
int err= ht->prepare(ht, thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
{
/* avoid sending error, if we're going to replay the transaction */
#ifdef WITH_WSREP
if (ht != wsrep_hton ||
err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY)
#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
}
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION &&
wsrep_after_prepare(thd, all))
{
err= 1;
}
#endif /* WITH_WSREP */
return err;
}
@@ -1359,7 +1413,7 @@ int ha_commit_trans(THD *thd, bool all)
}
#ifdef WITH_ARIA_STORAGE_ENGINE
ha_maria::implicit_commit(thd, TRUE);
ha_maria::implicit_commit(thd, TRUE);
#endif
if (!ha_info)
@@ -1369,6 +1423,12 @@ int ha_commit_trans(THD *thd, bool all)
*/
if (is_real_trans)
thd->transaction.cleanup();
#ifdef WITH_WSREP
if (WSREP(thd) && all && !error)
{
wsrep_commit_empty(thd, all);
}
#endif /* WITH_WSREP */
DBUG_RETURN(0);
}
@@ -1396,8 +1456,7 @@ int ha_commit_trans(THD *thd, bool all)
We allow the owner of FTWRL to COMMIT; we assume that it knows
what it does.
*/
mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
MDL_EXPLICIT);
mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
if (!WSREP(thd) &&
thd->mdl_context.acquire_lock(&mdl_request,
@@ -1455,7 +1514,28 @@ int ha_commit_trans(THD *thd, bool all)
if (trans->no_2pc || (rw_ha_count <= 1))
{
#ifdef WITH_WSREP
/*
This commit will not go through log_and_order() where wsrep commit
ordering is normally done. Commit ordering must be done here.
*/
bool run_wsrep_commit= (WSREP(thd) &&
rw_ha_count &&
wsrep_thd_is_local(thd) &&
wsrep_has_changes(thd, all));
if (run_wsrep_commit)
error= wsrep_before_commit(thd, all);
if (error)
{
ha_rollback_trans(thd, FALSE);
goto wsrep_err;
}
#endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all);
#ifdef WITH_WSREP
if (run_wsrep_commit)
error= wsrep_after_commit(thd, all);
#endif /* WITH_WSREP */
goto done;
}
@@ -1487,10 +1567,14 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
#ifdef WITH_WSREP
if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
if (!error && WSREP_ON)
{
// xid was rewritten by wsrep
xid= wsrep_xid_seqno(thd->transaction.xid_state.xid);
wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
if (!s.is_undefined())
{
// xid was rewritten by wsrep
xid= s.get();
}
}
#endif /* WITH_WSREP */
@@ -1499,18 +1583,35 @@ int ha_commit_trans(THD *thd, bool all)
error= commit_one_phase_2(thd, all, trans, is_real_trans);
goto done;
}
#ifdef WITH_WSREP
if (wsrep_before_commit(thd, all))
goto wsrep_err;
#endif /* WITH_WSREP */
DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
need_commit_ordered);
if (!cookie)
{
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
goto err;
}
DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order");
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
#ifdef WITH_WSREP
if (error || wsrep_after_commit(thd, all))
{
mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
{
mysql_mutex_unlock(&thd->LOCK_thd_data);
(void)tc_log->unlog(cookie, xid);
goto wsrep_err;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
#endif /* WITH_WSREP */
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
if (tc_log->unlog(cookie, xid))
{
@@ -1532,6 +1633,19 @@ done:
goto end;
/* Come here if error and we need to rollback. */
#ifdef WITH_WSREP
wsrep_err:
mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
{
WSREP_DEBUG("BF abort has happened after prepare & certify");
mysql_mutex_unlock(&thd->LOCK_thd_data);
ha_rollback_trans(thd, TRUE);
}
else
mysql_mutex_unlock(&thd->LOCK_thd_data);
#endif /* WITH_WSREP */
err:
error= 1; /* Transaction was rolled back */
/*
@@ -1541,7 +1655,11 @@ err:
*/
if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
ha_rollback_trans(thd, all);
else
{
WSREP_DEBUG("rollback skipped %p %d",thd->rgi_slave,
thd->rgi_slave->is_parallel_exec);
}
end:
if (rw_trans && mdl_request.ticket)
{
@@ -1553,6 +1671,13 @@ end:
*/
thd->mdl_context.release_lock(mdl_request.ticket);
}
#ifdef WITH_WSREP
if (WSREP(thd) && all && !error && (rw_ha_count == 0))
{
wsrep_commit_empty(thd, all);
}
#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
@@ -1710,6 +1835,9 @@ int ha_rollback_trans(THD *thd, bool all)
DBUG_RETURN(1);
}
#ifdef WITH_WSREP
(void) wsrep_before_rollback(thd, all);
#endif /* WITH_WSREP */
if (ha_info)
{
/* Close all cursors that can not survive ROLLBACK */
@@ -1725,9 +1853,9 @@ int ha_rollback_trans(THD *thd, bool all)
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
#ifdef WITH_WSREP
WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s",
thd->thread_id, thd->query_id, thd->wsrep_conflict_state,
thd->query());
WSREP_WARN("handlerton rollback failed, thd %lld %lld conf %d SQL %s",
thd->thread_id, thd->query_id, thd->wsrep_trx().state(),
thd->query());
#endif /* WITH_WSREP */
}
status_var_increment(thd->status_var.ha_rollback_count);
@@ -1746,6 +1874,15 @@ int ha_rollback_trans(THD *thd, bool all)
thd->transaction.xid_state.xa_state != XA_NOTR)
thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno();
#ifdef WITH_WSREP
if (thd->is_error())
{
WSREP_DEBUG("ha_rollback_trans(%lld, %s) rolled back: %s: %s; is_real %d",
thd->thread_id, all?"TRUE":"FALSE", WSREP_QUERY(thd),
thd->get_stmt_da()->message(), is_real_trans);
}
(void) wsrep_after_rollback(thd, all);
#endif /* WITH_WSREP */
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
@@ -1879,6 +2016,28 @@ static char* xid_to_str(char *buf, XID *xid)
}
#endif
#ifdef WITH_WSREP
static my_xid wsrep_order_and_check_continuity(XID *list, int len)
{
wsrep_sort_xid_array(list, len);
wsrep::gtid cur_position= wsrep_get_SE_checkpoint();
long long cur_seqno= cur_position.seqno().get();
for (int i= 0; i < len; ++i)
{
if (!wsrep_is_wsrep_xid(list + i) ||
wsrep_xid_seqno(list + i) != cur_seqno + 1)
{
WSREP_WARN("Discovered discontinuity in recovered wsrep "
"transaction XIDs. Truncating the recovery list to "
"%d entries", i);
break;
}
++cur_seqno;
}
WSREP_INFO("Last wsrep seqno to be recovered %lld", cur_seqno);
return (cur_seqno < 0 ? 0 : cur_seqno);
}
#endif /* WITH_WSREP */
/**
recover() step of xa.
@@ -1916,10 +2075,32 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
{
sql_print_information("Found %d prepared transaction(s) in %s",
got, hton_name(hton)->str);
#ifdef WITH_WSREP
/* If wsrep_on=ON, XIDs are first ordered and then the range of
recovered XIDs is checked for continuity. All the XIDs which
are in continuous range can be safely committed if binlog
is off since they have already ordered and certified in the
cluster.
The discontinuity of wsrep XIDs may happen because the GTID
is assigned for transaction in wsrep_before_prepare(), but the
commit order is entered in wsrep_before_commit(). This means that
transactions may run prepare step out of order and may
result in gap in wsrep XIDs. This can be the case for example
if we have T1 with seqno 1 and T2 with seqno 2 and the server
crashes after T2 finishes prepare step but before T1 starts
the prepare.
*/
my_xid wsrep_limit= 0;
if (WSREP_ON)
{
wsrep_limit= wsrep_order_and_check_continuity(info->list, got);
}
#endif /* WITH_WSREP */
for (int i=0; i < got; i ++)
{
my_xid x= IF_WSREP(WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ?
wsrep_xid_seqno(info->list[i]) :
wsrep_xid_seqno(&info->list[i]) :
info->list[i].get_my_xid(),
info->list[i].get_my_xid());
if (!x) // not "mine" - that is generated by external TM
@@ -1938,9 +2119,12 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
continue;
}
// recovery mode
if (info->commit_list ?
my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
if (IF_WSREP((wsrep_emulate_bin_log &&
wsrep_is_wsrep_xid(info->list + i) &&
x <= wsrep_limit), false) ||
(info->commit_list ?
my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT))
{
#ifndef DBUG_OFF
int rc=
@@ -2298,11 +2482,26 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
{
int err;
handlerton *ht= ha_info->ht();
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
{
WSREP_DEBUG("ha_rollback_to_savepoint: run before_rollbackha_rollback_trans hook");
(void) wsrep_before_rollback(thd, !thd->in_sub_stmt);
}
#endif // WITH_WSREP
if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt)))
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
}
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
{
WSREP_DEBUG("ha_rollback_to_savepoint: run after_rollback hook");
(void) wsrep_after_rollback(thd, !thd->in_sub_stmt);
}
#endif // WITH_WSREP
status_var_increment(thd->status_var.ha_rollback_count);
ha_info_next= ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */
@@ -2319,6 +2518,16 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
*/
int ha_savepoint(THD *thd, SAVEPOINT *sv)
{
#ifdef WITH_WSREP
/*
Register binlog hton for savepoint processing if wsrep binlog
emulation is on.
*/
if (WSREP_EMULATE_BINLOG(thd) && wsrep_thd_is_local(thd))
{
wsrep_register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode());
}
#endif /* WITH_WSREP */
int error=0;
THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
&thd->transaction.all);
@@ -5966,6 +6175,12 @@ bool handler::check_table_binlog_row_based(bool binlog_row)
return false;
if (unlikely((table->in_use->variables.sql_log_bin_off)))
return 0; /* Called by partitioning engine */
#ifdef WITH_WSREP
if (!table->in_use->variables.sql_log_bin &&
wsrep_thd_is_applying(table->in_use))
return 0; /* wsrep patch sets sql_log_bin to silence binlogging
from high priority threads */
#endif /* WITH_WSREP */
if (unlikely((!check_table_binlog_row_based_done)))
{
check_table_binlog_row_based_done= 1;
@@ -5996,12 +6211,12 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
Otherwise, return 'true' if binary logging is on.
*/
IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
(thd->wsrep_exec_mode != REPL_RECV)) ||
wsrep_thd_is_local(thd)) ||
((WSREP(thd) ||
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())),
(thd->variables.option_bits & OPTION_BIN_LOG) &&
mysql_bin_log.is_open()));
(thd->variables.option_bits & OPTION_BIN_LOG) &&
mysql_bin_log.is_open()));
}
@@ -6126,23 +6341,9 @@ int binlog_log_row(TABLE* table, const uchar *before_record,
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
table->file->partition_ht()->db_type != DB_TYPE_INNODB) ||
(thd->wsrep_ignore_table == true))
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
thd->wsrep_ignore_table == true)
return 0;
/* enforce wsrep_max_ws_rows */
if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE)
{
thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
thd->wsrep_exec_mode != REPL_RECV &&
thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
trans_rollback_stmt(thd) || trans_rollback(thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
return ER_ERROR_DURING_COMMIT;
}
}
#endif
if (!table->file->check_table_binlog_row_based(1))
@@ -6255,6 +6456,27 @@ int handler::ha_reset()
DBUG_RETURN(reset());
}
#ifdef WITH_WSREP
static int wsrep_after_row(THD *thd)
{
DBUG_ENTER("wsrep_after_row");
/* enforce wsrep_max_ws_rows */
thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
wsrep_thd_is_local(thd) &&
thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
trans_rollback_stmt(thd) || trans_rollback(thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
DBUG_RETURN(ER_ERROR_DURING_COMMIT);
}
else if (wsrep_after_row(thd, false))
{
DBUG_RETURN(ER_LOCK_DEADLOCK);
}
DBUG_RETURN(0);
}
#endif /* WITH_WSREP */
int handler::ha_write_row(uchar *buf)
{
@@ -6277,7 +6499,15 @@ int handler::ha_write_row(uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, 0, buf, log_func);
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
{
DBUG_RETURN(error);
}
#endif /* WITH_WSREP */
}
DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(error);
}
@@ -6309,6 +6539,13 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{
rows_changed++;
error= binlog_log_row(table, old_data, new_data, log_func);
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
{
return error;
}
#endif /* WITH_WSREP */
}
return error;
}
@@ -6364,6 +6601,13 @@ int handler::ha_delete_row(const uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, buf, 0, log_func);
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
{
return error;
}
#endif /* WITH_WSREP */
}
return error;
}
@@ -6553,7 +6797,7 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_ENTER("ha_abort_transaction");
if (!WSREP(bf_thd) &&
!(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU &&
bf_thd->wsrep_exec_mode == TOTAL_ORDER)) {
wsrep_thd_is_toi(bf_thd))) {
DBUG_RETURN(0);
}
@@ -6569,54 +6813,6 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_RETURN(0);
}
void ha_fake_trx_id(THD *thd)
{
DBUG_ENTER("ha_fake_trx_id");
bool no_fake_trx_id= true;
if (!WSREP(thd))
{
DBUG_VOID_RETURN;
}
if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID)
{
WSREP_DEBUG("fake trx id skipped: %" PRIu64, thd->wsrep_ws_handle.trx_id);
DBUG_VOID_RETURN;
}
/* Try statement transaction if standard one is not set. */
THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all :
&thd->transaction.stmt;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
for (; ha_info; ha_info= ha_info_next)
{
handlerton *hton= ha_info->ht();
if (hton->fake_trx_id)
{
hton->fake_trx_id(hton, thd);
/* Got a fake trx id. */
no_fake_trx_id= false;
/*
We need transaction ID from just one storage engine providing
fake_trx_id (which will most likely be the case).
*/
break;
}
ha_info_next= ha_info->next();
}
if (unlikely(no_fake_trx_id))
WSREP_WARN("Cannot get fake transaction ID from storage engine.");
DBUG_VOID_RETURN;
}
#endif /* WITH_WSREP */
@@ -6904,6 +7100,10 @@ int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info)
DBUG_RETURN(res);
}
/*****************************************************************************
VERSIONING functions
******************************************************************************/
bool Vers_parse_info::is_start(const char *name) const
{
DBUG_ASSERT(name);