1
0
mirror of https://github.com/MariaDB/server.git synced 2026-01-06 05:22:24 +03:00

MDEV-6676: Optimistic parallel replication

Implement a new mode for parallel replication. In this mode, all transactions
are optimistically attempted applied in parallel. In case of conflicts, the
offending transaction is rolled back and retried later non-parallel.

This is an early-release patch to facilitate testing, more changes to user
interface / options will be expected. The new mode is not enabled by default.
This commit is contained in:
Kristian Nielsen
2014-12-05 16:09:48 +01:00
parent 1e3f09f163
commit db21fddc37
55 changed files with 2596 additions and 466 deletions

View File

@@ -250,7 +250,13 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
asynchroneously to allow the former to complete its commit.
In this case, we convert the 'killed' error into a deadlock error, and retry
the later transaction. */
the later transaction.
If we are doing optimistic parallel apply of transactions not known to be
safe, we convert any error to a deadlock error, but then at retry we will
wait for prior transactions to commit first, so that the retries can be
done non-speculative.
*/
static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
@@ -260,8 +266,10 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
if (!thd->get_stmt_da()->is_error())
return;
err_code= thd->get_stmt_da()->sql_errno();
if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry)
if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
err_code != ER_PRIOR_COMMIT_FAILED) ||
((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry))
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
@@ -354,9 +362,43 @@ do_retry:
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
mysql_mutex_lock(&entry->LOCK_parallel_entry);
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
for (;;)
{
mysql_mutex_lock(&entry->LOCK_parallel_entry);
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
if (rgi->speculation != rpl_group_info::SPECULATE_OPTIMISTIC)
break;
/*
We speculatively tried to run this in parallel with prior event groups,
but it did not work for some reason.
So let us wait for all prior transactions to complete before trying
again. This way, we avoid repeatedly retrying and failing a small
transaction that conflicts with a prior long-running one.
*/
if (!(err= thd->wait_for_prior_commit()))
{
rgi->speculation = rpl_group_info::SPECULATE_WAIT;
break;
}
convert_kill_to_deadlock_error(rgi);
if (!has_temporary_error(thd))
goto err;
/*
If we get a temporary error such as a deadlock kill, we can safely
ignore it, as we already rolled back.
But we still want to retry the wait for the prior transaction to
complete its commit.
*/
thd->clear_error();
if(thd->wait_for_commit_ptr)
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
}
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
@@ -550,6 +592,13 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
/*
We need to use (at least) REPEATABLE READ isolation level. Otherwise
speculative parallel apply can run out-of-order and give wrong results
for statement-based replication.
*/
thd->variables.tx_isolation= ISO_REPEATABLE_READ;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@@ -639,6 +688,7 @@ handle_rpl_parallel_thread(void *arg)
}
});
thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -662,11 +712,11 @@ handle_rpl_parallel_thread(void *arg)
in parallel with.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (!gco->installed)
if (!(gco->flags & group_commit_orderer::INSTALLED))
{
if (gco->prev_gco)
gco->prev_gco->next_gco= gco;
gco->installed= true;
gco->flags|= group_commit_orderer::INSTALLED;
}
wait_count= gco->wait_count;
if (wait_count > entry->count_committing_event_groups)
@@ -766,6 +816,18 @@ handle_rpl_parallel_thread(void *arg)
/* We have to apply the event. */
}
}
/*
If we are optimistically running transactions in parallel, but this
particular event group should not run in parallel with what came
before, then wait now for the prior transaction to complete its
commit.
*/
if (rgi->speculation == rpl_group_info::SPECULATE_WAIT &&
(err= thd->wait_for_prior_commit()))
{
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
}
}
group_ending= is_group_ending(qev->ev, event_type);
@@ -1319,7 +1381,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
gco->wait_count= wait_count;
gco->prev_gco= prev;
gco->next_gco= NULL;
gco->installed= false;
gco->flags= 0;
return gco;
}
@@ -1849,6 +1911,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
bool did_enter_cond= false;
PSI_stage_info old_stage;
DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
/* Handle master log name change, seen in Rotate_log_event. */
typ= ev->get_type_code();
if (unlikely(typ == ROTATE_EVENT))
@@ -1929,7 +1992,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
!(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
@@ -1969,6 +2033,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
bool new_gco;
ulonglong mode= rli->mi->parallel_mode;
uchar gtid_flags= gtid_ev->flags2;
group_commit_orderer *gco;
uint8 force_switch_flag;
enum rpl_group_info::enum_speculation speculation;
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
@@ -1995,19 +2065,87 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info;
if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
e->last_commit_id == gtid_ev->commit_id))
speculation= rpl_group_info::SPECULATE_NO;
new_gco= true;
force_switch_flag= 0;
gco= e->current_gco;
if (likely(gco))
{
uint8 flags= gco->flags;
if (!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
e->last_commit_id != gtid_ev->commit_id)
flags|= group_commit_orderer::MULTI_BATCH;
/* Make sure we do not attempt to run DDL in parallel speculatively. */
if (gtid_flags & Gtid_log_event::FL_DDL)
flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH);
if (!(flags & group_commit_orderer::MULTI_BATCH))
{
/*
Still the same batch of event groups that group-committed together
on the master, so we can run in parallel.
*/
new_gco= false;
}
else if ((mode & SLAVE_PARALLEL_TRX) &&
!(flags & group_commit_orderer::FORCE_SWITCH))
{
/*
In transactional parallel mode, we optimistically attempt to run
non-DDL in parallel. In case of conflicts, we catch the conflict as
a deadlock or other error, roll back and retry serially.
The assumption is that only a few event groups will be
non-transactional or otherwise unsuitable for parallel apply. Those
transactions are still scheduled in parallel, but we set a flag that
will make the worker thread wait for everything before to complete
before starting.
*/
new_gco= false;
if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
((gtid_flags & Gtid_log_event::FL_WAITED) &&
!(mode & SLAVE_PARALLEL_WAITING)))
{
/*
This transaction should not be speculatively run in parallel with
what came before, either because it cannot safely be rolled back in
case of a conflict, or because it was marked as likely to conflict
and require expensive rollback and retry.
Here we mark it as such, and then the worker thread will do a
wait_for_prior_commit() before starting it. We do not introduce a
new group_commit_orderer, since we still want following transactions
to run in parallel with transactions prior to this one.
*/
speculation= rpl_group_info::SPECULATE_WAIT;
}
else
speculation= rpl_group_info::SPECULATE_OPTIMISTIC;
}
gco->flags= flags;
}
rgi->speculation= speculation;
if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
e->last_commit_id= gtid_ev->commit_id;
else
e->last_commit_id= 0;
if (new_gco)
{
/*
A new batch of transactions that group-committed together on the master.
Do not run this event group in parallel with what came before; instead
wait for everything prior to at least have started its commit phase, to
avoid any risk of performing any conflicting action too early.
Remember the count that marks the end of the previous group committed
batch, and allocate a new gco.
Remember the count that marks the end of the previous batch of event
groups that run in parallel, and allocate a new gco.
*/
uint64 count= e->count_queued_event_groups;
group_commit_orderer *gco;
if (!(gco= cur_thread->get_gco(count, e->current_gco)))
if (!(gco= cur_thread->get_gco(count, gco)))
{
cur_thread->free_rgi(rgi);
cur_thread->free_qev(qev);
@@ -2016,14 +2154,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
delete ev;
return 1;
}
e->current_gco= rgi->gco= gco;
gco->flags|= force_switch_flag;
e->current_gco= gco;
}
else
rgi->gco= e->current_gco;
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
e->last_commit_id= gtid_ev->commit_id;
else
e->last_commit_id= 0;
rgi->gco= gco;
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
++e->count_queued_event_groups;