1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

Merge branch 'mdev7818-4' into 10.1

Conflicts:
	mysql-test/suite/perfschema/r/stage_mdl_global.result
	sql/rpl_rli.cc
	sql/sql_parse.cc
This commit is contained in:
Kristian Nielsen
2015-11-13 14:24:40 +01:00
11 changed files with 642 additions and 88 deletions

View File

@@ -44,6 +44,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
(ev->when == 0)))
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
mysql_mutex_lock(&rli->data_lock);
/* Mutex will be released in apply_event_and_update_pos(). */
err= apply_event_and_update_pos(ev, thd, rgi, rpt);
@@ -271,6 +274,284 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
}
/*
Do not start parallel execution of this event group until all prior groups
have reached the commit phase that are not safe to run in parallel with.
*/
static bool
do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
bool *did_enter_cond, PSI_stage_info *old_stage)
{
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_count;
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
if (!gco->installed)
{
group_commit_orderer *prev_gco= gco->prev_gco;
if (prev_gco)
{
prev_gco->last_sub_id= gco->prior_sub_id;
prev_gco->next_gco= gco;
}
gco->installed= true;
}
wait_count= gco->wait_count;
if (wait_count > entry->count_committing_event_groups)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
thd->ENTER_COND(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry,
&stage_waiting_for_prior_transaction_to_start_commit,
old_stage);
*did_enter_cond= true;
do
{
if (thd->check_killed() && !rgi->worker_error)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
thd->send_kill_message();
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
prior event groups to signal that we can continue. Otherwise we
mess up the accounting for ordering. However, now that we have
marked the error, events will just be skipped rather than
executed, and things will progress quickly towards stop.
*/
}
mysql_cond_wait(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry);
} while (wait_count > entry->count_committing_event_groups);
}
if (entry->force_abort && wait_count > entry->stop_count)
{
/*
We are stopping (STOP SLAVE), and this event group is beyond the point
where we can safely stop. So return a flag that will cause us to skip,
rather than execute, the following events.
*/
return true;
}
else
return false;
}
static void
do_ftwrl_wait(rpl_group_info *rgi,
bool *did_enter_cond, PSI_stage_info *old_stage)
{
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 sub_id= rgi->gtid_sub_id;
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
/*
If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
transaction is later than transactions that have priority to complete
before FTWRL. If so, wait here so that FTWRL can proceed and complete
first.
(entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
this test false as required).
*/
if (unlikely(sub_id > entry->pause_sub_id))
{
thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
&stage_waiting_for_ftwrl, old_stage);
*did_enter_cond= true;
do
{
if (entry->force_abort || rgi->worker_error)
break;
if (thd->check_killed())
{
thd->send_kill_message();
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
break;
}
mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
} while (sub_id > entry->pause_sub_id);
/*
We do not call EXIT_COND() here, as this will be done later by our
caller (since we set *did_enter_cond to true).
*/
}
if (sub_id > entry->largest_started_sub_id)
entry->largest_started_sub_id= sub_id;
}
static int
pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
{
PSI_stage_info old_stage;
int res= 0;
/*
Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
READ LOCK work correctly, without incuring extra locking penalties in
normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
thread pool, and for this we need to make sure the pool will not go away
during the operation. The LOCK_rpl_thread_pool is not suitable for
this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
must be released before locking any LOCK_rpl_thread lock, or a deadlock
can occur.
So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
pool size changes with this condition wait.
*/
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
if (thd)
thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
&stage_waiting_for_rpl_thread_pool, &old_stage);
while (pool->busy)
{
if (thd && thd->check_killed())
{
thd->send_kill_message();
res= 1;
break;
}
mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
}
if (!res)
pool->busy= true;
if (thd)
thd->EXIT_COND(&old_stage);
else
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return res;
}
static void
pool_mark_not_busy(rpl_parallel_thread_pool *pool)
{
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
DBUG_ASSERT(pool->busy);
pool->busy= false;
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
}
void
rpl_unpause_after_ftwrl(THD *thd)
{
uint32 i;
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
DBUG_ASSERT(pool->busy);
for (i= 0; i < pool->count; ++i)
{
rpl_parallel_entry *e;
rpl_parallel_thread *rpt= pool->threads[i];
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
if (!rpt->current_owner)
{
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
continue;
}
e= rpt->current_entry;
mysql_mutex_lock(&e->LOCK_parallel_entry);
rpt->pause_for_ftwrl = false;
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
e->pause_sub_id= (uint64)ULONGLONG_MAX;
mysql_cond_broadcast(&e->COND_parallel_entry);
mysql_mutex_unlock(&e->LOCK_parallel_entry);
}
pool_mark_not_busy(pool);
}
/*
.
Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
*/
int
rpl_pause_for_ftwrl(THD *thd)
{
uint32 i;
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
int err;
/*
While the count_pending_pause_for_ftwrl counter is non-zero, the pool
cannot be shutdown/resized, so threads are guaranteed to not disappear.
This is required to safely be able to access the individual threads below.
(We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
as this can deadlock against release_thread()).
*/
if ((err= pool_mark_busy(pool, thd)))
return err;
for (i= 0; i < pool->count; ++i)
{
PSI_stage_info old_stage;
rpl_parallel_entry *e;
rpl_parallel_thread *rpt= pool->threads[i];
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
if (!rpt->current_owner)
{
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
continue;
}
e= rpt->current_entry;
mysql_mutex_lock(&e->LOCK_parallel_entry);
/*
Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
*/
rpt->pause_for_ftwrl = true;
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
++e->need_sub_id_signal;
if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
e->pause_sub_id= e->largest_started_sub_id;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
e->last_committed_sub_id < e->pause_sub_id &&
!err)
{
if (thd->check_killed())
{
thd->send_kill_message();
err= 1;
break;
}
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
};
--e->need_sub_id_signal;
thd->EXIT_COND(&old_stage);
if (err)
break;
}
if (err)
rpl_unpause_after_ftwrl(thd);
return err;
}
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@@ -792,7 +1073,6 @@ handle_rpl_parallel_thread(void *arg)
{
bool did_enter_cond= false;
PSI_stage_info old_stage;
uint64 wait_count;
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
if (rgi->current_gtid.domain_id == 0 &&
@@ -831,72 +1111,19 @@ handle_rpl_parallel_thread(void *arg)
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
mysql_mutex_lock(&entry->LOCK_parallel_entry);
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
if (likely(!skip_event_group))
do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
/*
Register ourself to wait for the previous commit, if we need to do
such registration _and_ that previous commit has not already
occured.
Also do not start parallel execution of this event group until all
prior groups have reached the commit phase that are not safe to run
in parallel with.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (!gco->installed)
{
group_commit_orderer *prev_gco= gco->prev_gco;
if (prev_gco)
{
prev_gco->last_sub_id= gco->prior_sub_id;
prev_gco->next_gco= gco;
}
gco->installed= true;
}
wait_count= gco->wait_count;
if (wait_count > entry->count_committing_event_groups)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
thd->ENTER_COND(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry,
&stage_waiting_for_prior_transaction_to_start_commit,
&old_stage);
did_enter_cond= true;
do
{
if (thd->check_killed() && !rgi->worker_error)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
thd->send_kill_message();
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
prior event groups to signal that we can continue. Otherwise we
mess up the accounting for ordering. However, now that we have
marked the error, events will just be skipped rather than
executed, and things will progress quickly towards stop.
*/
}
mysql_cond_wait(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry);
} while (wait_count > entry->count_committing_event_groups);
}
if (entry->force_abort && wait_count > entry->stop_count)
{
/*
We are stopping (STOP SLAVE), and this event group is beyond the
point where we can safely stop. So set a flag that will cause us
to skip, rather than execute, the following events.
*/
skip_event_group= true;
}
else
skip_event_group= false;
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
@@ -1060,17 +1287,40 @@ handle_rpl_parallel_thread(void *arg)
*/
rpt->batch_free();
if ((events= rpt->event_queue) != NULL)
for (;;)
{
if ((events= rpt->event_queue) != NULL)
{
/*
Take next group of events from the replication pool.
This is faster than having to wakeup the pool manager thread to give
us a new event.
*/
rpt->dequeue1(events);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events;
}
if (!rpt->pause_for_ftwrl ||
(in_event_group && !group_rgi->parallel_entry->force_abort))
break;
/*
Take next group of events from the replication pool.
This is faster than having to wakeup the pool manager thread to give us
a new event.
We are currently in the delicate process of pausing parallel
replication while FLUSH TABLES WITH READ LOCK is starting. We must
not de-allocate the thread (setting rpt->current_owner= NULL) until
rpl_unpause_after_ftwrl() has woken us up.
*/
rpt->dequeue1(events);
mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events;
mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
&rpt->current_entry->LOCK_parallel_entry);
mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
/*
Now loop to check again for more events available, since we released
and re-aquired the LOCK_rpl_thread mutex.
*/
}
rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
@@ -1148,6 +1398,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
rpl_parallel_thread **new_list= NULL;
rpl_parallel_thread *new_free_list= NULL;
rpl_parallel_thread *rpt_array= NULL;
int res;
if ((res= pool_mark_busy(pool, current_thd)))
return res;
/*
Allocate the new list of threads up-front.
@@ -1196,7 +1450,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
rpl_parallel_thread *rpt;
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
while ((rpt= pool->free_list) == NULL)
mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
pool->free_list= rpt->next;
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -1250,9 +1511,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
pool_mark_not_busy(pool);
return 0;
@@ -1276,6 +1535,7 @@ err:
}
my_free(new_list);
}
pool_mark_not_busy(pool);
return 1;
}
@@ -1538,7 +1798,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: count(0), threads(0), free_list(0), inited(false)
: threads(0), free_list(0), count(0), inited(false), busy(false)
{
}
@@ -1546,9 +1806,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
int
rpl_parallel_thread_pool::init(uint32 size)
{
count= 0;
threads= NULL;
free_list= NULL;
count= 0;
busy= false;
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW);
@@ -1589,7 +1850,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
rpl_parallel_thread *rpt;
mysql_mutex_lock(&LOCK_rpl_thread_pool);
while ((rpt= free_list) == NULL)
while (unlikely(busy) || !(rpt= free_list))
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
@@ -1800,6 +2061,7 @@ rpl_parallel::find(uint32 domain_id)
e->rpl_thread_max= count;
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -2001,7 +2263,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
e->need_sub_id_signal= true;
++e->need_sub_id_signal;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_workers_idle, &old_stage);
while (e->current_sub_id > e->last_committed_sub_id)
@@ -2014,7 +2276,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
}
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
}
e->need_sub_id_signal= false;
--e->need_sub_id_signal;
thd->EXIT_COND(&old_stage);
if (err)
return err;