mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
MDEV-4506: Parallel replication: After-review fixes.
This commit is contained in:
@ -132,7 +132,6 @@ handle_rpl_parallel_thread(void *arg)
|
||||
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
|
||||
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
|
||||
/* Mark that this thread is now executing */
|
||||
rpt->free= false;
|
||||
rpt->event_queue= rpt->last_in_queue= NULL;
|
||||
thd->exit_cond(old_msg);
|
||||
|
||||
@ -216,12 +215,24 @@ handle_rpl_parallel_thread(void *arg)
|
||||
{
|
||||
in_event_group= false;
|
||||
|
||||
/*
|
||||
Remove any left-over registration to wait for a prior commit to
|
||||
complete. Normally, such wait would already have been removed at
|
||||
this point by wait_for_prior_commit(), but eg. in error case we
|
||||
might have skipped waiting, so we would need to remove it explicitly.
|
||||
*/
|
||||
rgi->commit_orderer.unregister_wait_for_prior_commit();
|
||||
thd->wait_for_commit_ptr= NULL;
|
||||
|
||||
/*
|
||||
Record that we have finished, so other event groups will no
|
||||
longer attempt to wait for us to commit.
|
||||
Record that this event group has finished (eg. transaction is
|
||||
committed, if transactional), so other event groups will no longer
|
||||
attempt to wait for us to commit. Once we have increased
|
||||
entry->last_committed_sub_id, no other threads will execute
|
||||
register_wait_for_prior_commit() against us. Thus, by doing one
|
||||
extra (usually redundant) wakeup_subsequent_commits() we can ensure
|
||||
that no register_wait_for_prior_commit() can ever happen without a
|
||||
subsequent wakeup_subsequent_commits() to wake it up.
|
||||
|
||||
We can race here with the next transactions, but that is fine, as
|
||||
long as we check that we do not decrease last_committed_sub_id. If
|
||||
@ -246,6 +257,11 @@ handle_rpl_parallel_thread(void *arg)
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
if ((events= rpt->event_queue) != NULL)
|
||||
{
|
||||
/*
|
||||
Take next group of events from the replication pool.
|
||||
This is faster than having to wakeup the pool manager thread to give us
|
||||
a new event.
|
||||
*/
|
||||
rpt->event_queue= rpt->last_in_queue= NULL;
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
goto more_events;
|
||||
@ -254,7 +270,7 @@ handle_rpl_parallel_thread(void *arg)
|
||||
if (!in_event_group)
|
||||
{
|
||||
rpt->current_entry= NULL;
|
||||
if (!rpt->stop && !rpt->free)
|
||||
if (!rpt->stop)
|
||||
{
|
||||
mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
|
||||
list= rpt->pool->free_list;
|
||||
@ -263,7 +279,6 @@ handle_rpl_parallel_thread(void *arg)
|
||||
if (!list)
|
||||
mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
|
||||
mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
|
||||
rpt->free= true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -300,6 +315,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
uint32 i;
|
||||
rpl_parallel_thread **new_list= NULL;
|
||||
rpl_parallel_thread *new_free_list= NULL;
|
||||
rpl_parallel_thread *rpt_array= NULL;
|
||||
|
||||
/*
|
||||
Allocate the new list of threads up-front.
|
||||
@ -307,10 +323,13 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
to allocate, and will not be left with a half-functional thread pool.
|
||||
*/
|
||||
if (new_count &&
|
||||
!(new_list= (rpl_parallel_thread **)my_malloc(new_count*sizeof(*new_list),
|
||||
MYF(MY_WME))))
|
||||
!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
|
||||
&new_list, new_count*sizeof(*new_list),
|
||||
&rpt_array, new_count*sizeof(*rpt_array),
|
||||
NULL))
|
||||
{
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list))));
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) +
|
||||
new_count*sizeof(*rpt_array))));
|
||||
goto err;;
|
||||
}
|
||||
|
||||
@ -318,28 +337,16 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
{
|
||||
pthread_t th;
|
||||
|
||||
if (!(new_list[i]= (rpl_parallel_thread *)my_malloc(sizeof(*(new_list[i])),
|
||||
MYF(MY_WME))))
|
||||
{
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*(new_list[i])));
|
||||
goto err;
|
||||
}
|
||||
new_list[i]= &rpt_array[i];
|
||||
new_list[i]->delay_start= true;
|
||||
new_list[i]->running= false;
|
||||
new_list[i]->stop= false;
|
||||
new_list[i]->free= false;
|
||||
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
|
||||
MY_MUTEX_INIT_SLOW);
|
||||
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
|
||||
new_list[i]->pool= pool;
|
||||
new_list[i]->current_entry= NULL;
|
||||
new_list[i]->event_queue= NULL;
|
||||
new_list[i]->last_in_queue= NULL;
|
||||
if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
|
||||
handle_rpl_parallel_thread, new_list[i]))
|
||||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
||||
my_free(new_list[i]);
|
||||
goto err;
|
||||
}
|
||||
new_list[i]->next= new_free_list;
|
||||
@ -364,6 +371,13 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
mysql_mutex_unlock(&LOCK_active_mi);
|
||||
}
|
||||
|
||||
/*
|
||||
Grab each old thread in turn, and signal it to stop.
|
||||
|
||||
Note that since we require all replication threads to be stopped before
|
||||
changing the parallel replication worker thread pool, all the threads will
|
||||
be already idle and will terminate immediately.
|
||||
*/
|
||||
for (i= 0; i < pool->count; ++i)
|
||||
{
|
||||
rpl_parallel_thread *rpt= pool->get_thread(NULL);
|
||||
@ -381,7 +395,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
|
||||
mysql_cond_destroy(&rpt->COND_rpl_thread);
|
||||
my_free(rpt);
|
||||
}
|
||||
|
||||
my_free(pool->threads);
|
||||
@ -409,7 +422,6 @@ err:
|
||||
{
|
||||
while (new_free_list)
|
||||
{
|
||||
rpl_parallel_thread *next= new_free_list->next;
|
||||
mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
|
||||
new_free_list->delay_start= false;
|
||||
new_free_list->stop= true;
|
||||
@ -421,8 +433,7 @@ err:
|
||||
mysql_cond_wait(&new_free_list->COND_rpl_thread,
|
||||
&new_free_list->LOCK_rpl_thread);
|
||||
mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
|
||||
my_free(new_free_list);
|
||||
new_free_list= next;
|
||||
new_free_list= new_free_list->next;
|
||||
}
|
||||
my_free(new_list);
|
||||
}
|
||||
@ -471,6 +482,12 @@ rpl_parallel_thread_pool::destroy()
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Wait for a worker thread to become idle. When one does, grab the thread for
|
||||
our use and return it.
|
||||
|
||||
Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
|
||||
*/
|
||||
struct rpl_parallel_thread *
|
||||
rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
|
||||
{
|
||||
@ -571,7 +588,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
|
||||
rpl_parallel_entry *e;
|
||||
rpl_parallel_thread *cur_thread;
|
||||
rpl_parallel_thread::queued_event *qev;
|
||||
rpl_group_info *rgi;
|
||||
rpl_group_info *rgi= NULL;
|
||||
Relay_log_info *rli= serial_rgi->rli;
|
||||
enum Log_event_type typ;
|
||||
|
||||
@ -596,6 +613,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
|
||||
event_group_new_gtid(rgi, gtid_ev))
|
||||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
||||
delete rgi;
|
||||
return true;
|
||||
}
|
||||
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
|
||||
@ -622,14 +640,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Check if we already have a worker thread for this entry. */
|
||||
/*
|
||||
Check if we already have a worker thread for this entry.
|
||||
|
||||
We continue to queue more events up for the worker thread while it is
|
||||
still executing the first ones, to be able to start executing a large
|
||||
event group without having to wait for the end to be fetched from the
|
||||
master. And we continue to queue up more events after the first group,
|
||||
avoiding the overhead of worker threads constantly entering and
|
||||
leaving the worker thread free list.
|
||||
|
||||
But if the worker thread is idle at any point, it may return to the
|
||||
idle list or be servicing a different request. So check this, and
|
||||
allocate a new thread if the old one is no longer processing for us.
|
||||
*/
|
||||
cur_thread= e->rpl_thread;
|
||||
if (cur_thread)
|
||||
{
|
||||
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
|
||||
if (cur_thread->current_entry != e)
|
||||
{
|
||||
/* Not ours anymore, we need to grab a new one. */
|
||||
/*
|
||||
The worker thread became idle, and returned to the free list and
|
||||
possibly was allocated to a different request. This also means
|
||||
that everything previously queued has already been executed, else
|
||||
the worker thread would not have become idle. So we should
|
||||
allocate a new worker thread.
|
||||
*/
|
||||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
|
||||
e->rpl_thread= cur_thread= NULL;
|
||||
}
|
||||
@ -682,6 +719,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
|
||||
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
|
||||
Same for events not preceeded by GTID (we should not see those normally,
|
||||
but they might be from an old master).
|
||||
|
||||
The varuable `current' is NULL for the case where the master did not
|
||||
have GTID, like a MariaDB 5.5 or MySQL master.
|
||||
*/
|
||||
qev->rgi= serial_rgi;
|
||||
rpt_handle_event(qev, NULL);
|
||||
@ -719,8 +759,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
|
||||
else
|
||||
cur_thread->event_queue= qev;
|
||||
cur_thread->last_in_queue= qev;
|
||||
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
||||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
|
||||
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ struct rpl_parallel_thread {
|
||||
bool delay_start;
|
||||
bool running;
|
||||
bool stop;
|
||||
bool free;
|
||||
mysql_mutex_t LOCK_rpl_thread;
|
||||
mysql_cond_t COND_rpl_thread;
|
||||
struct rpl_parallel_thread *next; /* For free list. */
|
||||
|
@ -5631,8 +5631,8 @@ wait_for_commit::wakeup()
|
||||
*/
|
||||
mysql_mutex_lock(&LOCK_wait_commit);
|
||||
waiting_for_commit= false;
|
||||
mysql_cond_signal(&COND_wait_commit);
|
||||
mysql_mutex_unlock(&LOCK_wait_commit);
|
||||
mysql_cond_signal(&COND_wait_commit);
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user