mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Merge 10.0 into 10.1
This commit is contained in:
@@ -1313,39 +1313,16 @@ handle_rpl_parallel_thread(void *arg)
|
||||
*/
|
||||
rpt->batch_free();
|
||||
|
||||
for (;;)
|
||||
if ((events= rpt->event_queue) != NULL)
|
||||
{
|
||||
if ((events= rpt->event_queue) != NULL)
|
||||
{
|
||||
/*
|
||||
Take next group of events from the replication pool.
|
||||
This is faster than having to wakeup the pool manager thread to give
|
||||
us a new event.
|
||||
*/
|
||||
rpt->dequeue1(events);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
goto more_events;
|
||||
}
|
||||
if (!rpt->pause_for_ftwrl ||
|
||||
(in_event_group && !group_rgi->parallel_entry->force_abort))
|
||||
break;
|
||||
/*
|
||||
We are currently in the delicate process of pausing parallel
|
||||
replication while FLUSH TABLES WITH READ LOCK is starting. We must
|
||||
not de-allocate the thread (setting rpt->current_owner= NULL) until
|
||||
rpl_unpause_after_ftwrl() has woken us up.
|
||||
Take next group of events from the replication pool.
|
||||
This is faster than having to wakeup the pool manager thread to give
|
||||
us a new event.
|
||||
*/
|
||||
mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
|
||||
rpt->dequeue1(events);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
if (rpt->pause_for_ftwrl)
|
||||
mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
|
||||
&rpt->current_entry->LOCK_parallel_entry);
|
||||
mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
/*
|
||||
Now loop to check again for more events available, since we released
|
||||
and re-aquired the LOCK_rpl_thread mutex.
|
||||
*/
|
||||
goto more_events;
|
||||
}
|
||||
|
||||
rpt->inuse_relaylog_refcount_update();
|
||||
@@ -1372,11 +1349,36 @@ handle_rpl_parallel_thread(void *arg)
|
||||
}
|
||||
if (!in_event_group)
|
||||
{
|
||||
/* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */
|
||||
while (rpt->current_entry && rpt->pause_for_ftwrl)
|
||||
{
|
||||
/*
|
||||
We are currently in the delicate process of pausing parallel
|
||||
replication while FLUSH TABLES WITH READ LOCK is starting. We must
|
||||
not de-allocate the thread (setting rpt->current_owner= NULL) until
|
||||
rpl_unpause_after_ftwrl() has woken us up.
|
||||
*/
|
||||
rpl_parallel_entry *e= rpt->current_entry;
|
||||
/*
|
||||
Ensure that we will unblock rpl_pause_for_ftrwl()
|
||||
e->pause_sub_id may be LONGLONG_MAX if rpt->current_entry has changed
|
||||
*/
|
||||
DBUG_ASSERT(e->pause_sub_id == (uint64)ULONGLONG_MAX ||
|
||||
e->last_committed_sub_id >= e->pause_sub_id);
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
if (rpt->pause_for_ftwrl)
|
||||
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
}
|
||||
|
||||
rpt->current_owner= NULL;
|
||||
/* Tell wait_for_done() that we are done, if it is waiting. */
|
||||
if (likely(rpt->current_entry) &&
|
||||
unlikely(rpt->current_entry->force_abort))
|
||||
mysql_cond_broadcast(&rpt->COND_rpl_thread_stop);
|
||||
|
||||
rpt->current_entry= NULL;
|
||||
if (!rpt->stop)
|
||||
rpt->pool->release_thread(rpt);
|
||||
@@ -1416,10 +1418,24 @@ dealloc_gco(group_commit_orderer *gco)
|
||||
my_free(gco);
|
||||
}
|
||||
|
||||
/**
|
||||
Change thread count for global parallel worker threads
|
||||
|
||||
@param pool parallel thread pool
|
||||
@param new_count Number of threads to be in pool. 0 in shutdown
|
||||
@param force Force thread count to new_count even if slave
|
||||
threads are running
|
||||
|
||||
By default we don't resize pool of there are running threads.
|
||||
However during shutdown we will always do it.
|
||||
This is needed as any_slave_sql_running() returns 1 during shutdown
|
||||
as we don't want to access master_info while
|
||||
Master_info_index::free_connections are running.
|
||||
*/
|
||||
|
||||
static int
|
||||
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
uint32 new_count)
|
||||
uint32 new_count, bool force)
|
||||
{
|
||||
uint32 i;
|
||||
rpl_parallel_thread **old_list= NULL;
|
||||
@@ -1431,6 +1447,28 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
if ((res= pool_mark_busy(pool, current_thd)))
|
||||
return res;
|
||||
|
||||
/* Protect against parallel pool resizes */
|
||||
if (pool->count == new_count)
|
||||
{
|
||||
pool_mark_not_busy(pool);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
If we are about to delete pool, do an extra check that there are no new
|
||||
slave threads running since we marked pool busy
|
||||
*/
|
||||
if (!new_count && !force)
|
||||
{
|
||||
if (any_slave_sql_running())
|
||||
{
|
||||
DBUG_PRINT("warning",
|
||||
("SQL threads running while trying to reset parallel pool"));
|
||||
pool_mark_not_busy(pool);
|
||||
return 0; // Ok to not resize pool
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Allocate the new list of threads up-front.
|
||||
That way, if we fail half-way, we only need to free whatever we managed
|
||||
@@ -1444,7 +1482,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
{
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) +
|
||||
new_count*sizeof(*rpt_array))));
|
||||
goto err;;
|
||||
goto err;
|
||||
}
|
||||
|
||||
for (i= 0; i < new_count; ++i)
|
||||
@@ -1569,12 +1607,26 @@ err:
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
Deactivate the parallel replication thread pool, if there are now no more
|
||||
SQL threads running.
|
||||
*/
|
||||
|
||||
int rpl_parallel_resize_pool_if_no_slaves(void)
|
||||
{
|
||||
/* master_info_index is set to NULL on shutdown */
|
||||
if (opt_slave_parallel_threads > 0 && !any_slave_sql_running())
|
||||
return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
|
||||
{
|
||||
if (!pool->count)
|
||||
return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads);
|
||||
return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
|
||||
0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -1582,7 +1634,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
|
||||
int
|
||||
rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
|
||||
{
|
||||
return rpl_parallel_change_thread_count(pool, 0);
|
||||
return rpl_parallel_change_thread_count(pool, 0, 0);
|
||||
}
|
||||
|
||||
|
||||
@@ -1860,7 +1912,7 @@ rpl_parallel_thread_pool::destroy()
|
||||
{
|
||||
if (!inited)
|
||||
return;
|
||||
rpl_parallel_change_thread_count(this, 0);
|
||||
rpl_parallel_change_thread_count(this, 0, 1);
|
||||
mysql_mutex_destroy(&LOCK_rpl_thread_pool);
|
||||
mysql_cond_destroy(&COND_rpl_thread_pool);
|
||||
inited= false;
|
||||
@@ -1879,6 +1931,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
|
||||
{
|
||||
rpl_parallel_thread *rpt;
|
||||
|
||||
DBUG_ASSERT(count > 0);
|
||||
mysql_mutex_lock(&LOCK_rpl_thread_pool);
|
||||
while (unlikely(busy) || !(rpt= free_list))
|
||||
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
|
||||
@@ -2107,6 +2160,11 @@ rpl_parallel::find(uint32 domain_id)
|
||||
return e;
|
||||
}
|
||||
|
||||
/**
|
||||
Wait until all sql worker threads has stopped processing
|
||||
|
||||
This is called when sql thread has been killed/stopped
|
||||
*/
|
||||
|
||||
void
|
||||
rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
||||
|
Reference in New Issue
Block a user