mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
10.0-base merge
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
#include "rpl_parallel.h"
|
||||
#include "slave.h"
|
||||
#include "rpl_mi.h"
|
||||
#include "debug_sync.h"
|
||||
|
||||
|
||||
/*
|
||||
@@ -24,7 +25,7 @@ static int
|
||||
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
||||
struct rpl_parallel_thread *rpt)
|
||||
{
|
||||
int err __attribute__((unused));
|
||||
int err;
|
||||
rpl_group_info *rgi= qev->rgi;
|
||||
Relay_log_info *rli= rgi->rli;
|
||||
THD *thd= rgi->thd;
|
||||
@@ -142,7 +143,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
|
||||
if (err)
|
||||
wfc->unregister_wait_for_prior_commit();
|
||||
else
|
||||
wfc->wait_for_prior_commit();
|
||||
err= wfc->wait_for_prior_commit(thd);
|
||||
thd->wait_for_commit_ptr= NULL;
|
||||
|
||||
/*
|
||||
@@ -172,6 +173,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
|
||||
{
|
||||
rgi->is_error= true;
|
||||
rgi->cleanup_context(thd, true);
|
||||
rgi->rli->abort_slave= true;
|
||||
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
|
||||
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
|
||||
rgi->rli->relay_log.signal_update();
|
||||
}
|
||||
|
||||
|
||||
pthread_handler_t
|
||||
handle_rpl_parallel_thread(void *arg)
|
||||
{
|
||||
@@ -207,6 +220,7 @@ handle_rpl_parallel_thread(void *arg)
|
||||
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
|
||||
set_slave_thread_options(thd);
|
||||
thd->client_capabilities = CLIENT_LOCAL_FILES;
|
||||
thd->net.reading_or_writing= 0;
|
||||
thd_proc_info(thd, "Waiting for work from main SQL threads");
|
||||
thd->set_time();
|
||||
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
|
||||
@@ -284,21 +298,42 @@ handle_rpl_parallel_thread(void *arg)
|
||||
wait_start_sub_id= rgi->wait_start_sub_id;
|
||||
if (wait_for_sub_id || wait_start_sub_id)
|
||||
{
|
||||
bool did_enter_cond= false;
|
||||
PSI_stage_info old_stage;
|
||||
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
if (wait_start_sub_id)
|
||||
{
|
||||
while (wait_start_sub_id > entry->last_committed_sub_id)
|
||||
thd->ENTER_COND(&entry->COND_parallel_entry,
|
||||
&entry->LOCK_parallel_entry,
|
||||
&stage_waiting_for_prior_transaction_to_commit,
|
||||
&old_stage);
|
||||
did_enter_cond= true;
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
|
||||
while (wait_start_sub_id > entry->last_committed_sub_id &&
|
||||
!thd->check_killed())
|
||||
mysql_cond_wait(&entry->COND_parallel_entry,
|
||||
&entry->LOCK_parallel_entry);
|
||||
if (wait_start_sub_id > entry->last_committed_sub_id)
|
||||
{
|
||||
/* The thread got a kill signal. */
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
|
||||
thd->send_kill_message();
|
||||
slave_output_error_info(rgi->rli, thd);
|
||||
signal_error_to_sql_driver_thread(thd, rgi);
|
||||
}
|
||||
rgi->wait_start_sub_id= 0; /* No need to check again. */
|
||||
}
|
||||
rgi->wait_start_sub_id= 0; /* No need to check again. */
|
||||
if (wait_for_sub_id > entry->last_committed_sub_id)
|
||||
{
|
||||
wait_for_commit *waitee=
|
||||
&rgi->wait_commit_group_info->commit_orderer;
|
||||
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
|
||||
}
|
||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||
if (did_enter_cond)
|
||||
thd->EXIT_COND(&old_stage);
|
||||
else
|
||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||
}
|
||||
|
||||
if(thd->wait_for_commit_ptr)
|
||||
@@ -341,10 +376,8 @@ handle_rpl_parallel_thread(void *arg)
|
||||
|
||||
if (err)
|
||||
{
|
||||
rgi->is_error= true;
|
||||
slave_output_error_info(rgi->rli, thd);
|
||||
rgi->cleanup_context(thd, true);
|
||||
rgi->rli->abort_slave= true;
|
||||
signal_error_to_sql_driver_thread(thd, rgi);
|
||||
}
|
||||
if (end_of_group)
|
||||
{
|
||||
@@ -353,6 +386,7 @@ handle_rpl_parallel_thread(void *arg)
|
||||
&rgi->commit_orderer);
|
||||
delete rgi;
|
||||
group_rgi= rgi= NULL;
|
||||
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
|
||||
}
|
||||
|
||||
events= next;
|
||||
@@ -383,11 +417,9 @@ handle_rpl_parallel_thread(void *arg)
|
||||
half-processed event group.
|
||||
*/
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
group_rgi->is_error= true;
|
||||
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
|
||||
group_rgi->parallel_entry, &group_rgi->commit_orderer);
|
||||
group_rgi->cleanup_context(thd, true);
|
||||
group_rgi->rli->abort_slave= true;
|
||||
signal_error_to_sql_driver_thread(thd, group_rgi);
|
||||
in_event_group= false;
|
||||
delete group_rgi;
|
||||
group_rgi= NULL;
|
||||
@@ -752,6 +784,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
Relay_log_info *rli= serial_rgi->rli;
|
||||
enum Log_event_type typ;
|
||||
bool is_group_event;
|
||||
bool did_enter_cond= false;
|
||||
PSI_stage_info old_stage;
|
||||
|
||||
/* ToDo: what to do with this lock?!? */
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
@@ -766,6 +800,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
sql_thread_stopping= true;
|
||||
if (sql_thread_stopping)
|
||||
{
|
||||
delete ev;
|
||||
/* QQ: Need a better comment why we return false here */
|
||||
return false;
|
||||
}
|
||||
@@ -774,6 +809,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
MYF(0))))
|
||||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
||||
delete ev;
|
||||
return true;
|
||||
}
|
||||
qev->ev= ev;
|
||||
@@ -796,6 +832,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
||||
delete rgi;
|
||||
my_free(qev);
|
||||
delete ev;
|
||||
return true;
|
||||
}
|
||||
rgi->is_parallel_exec = true;
|
||||
@@ -813,6 +851,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
However, the commit of this event must wait for the commit of the prior
|
||||
event, to preserve binlog commit order and visibility across all
|
||||
servers in the replication hierarchy.
|
||||
|
||||
In addition, we must not start executing this event until we have
|
||||
finished the previous collection of event groups that group-committed
|
||||
together; we use rgi->wait_start_sub_id to control this.
|
||||
*/
|
||||
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
|
||||
rgi->wait_commit_sub_id= e->current_sub_id;
|
||||
@@ -859,6 +901,21 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
}
|
||||
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
|
||||
break; // The thread is ready to queue into
|
||||
else if (rli->sql_driver_thd->check_killed())
|
||||
{
|
||||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
|
||||
my_error(ER_CONNECTION_KILLED, MYF(0));
|
||||
delete rgi;
|
||||
my_free(qev);
|
||||
delete ev;
|
||||
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
|
||||
{
|
||||
debug_sync_set_action(rli->sql_driver_thd,
|
||||
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
|
||||
};);
|
||||
slave_output_error_info(rli, rli->sql_driver_thd);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
@@ -866,6 +923,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
use for queuing events, so wait for the thread to consume some
|
||||
of its queue.
|
||||
*/
|
||||
if (!did_enter_cond)
|
||||
{
|
||||
rli->sql_driver_thd->ENTER_COND(&cur_thread->COND_rpl_thread,
|
||||
&cur_thread->LOCK_rpl_thread,
|
||||
&stage_waiting_for_room_in_worker_thread, &old_stage);
|
||||
did_enter_cond= true;
|
||||
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
|
||||
{
|
||||
debug_sync_set_action(rli->sql_driver_thd,
|
||||
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
|
||||
};);
|
||||
}
|
||||
mysql_cond_wait(&cur_thread->COND_rpl_thread,
|
||||
&cur_thread->LOCK_rpl_thread);
|
||||
}
|
||||
@@ -1015,7 +1084,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
*/
|
||||
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
|
||||
cur_thread->enqueue(qev);
|
||||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
|
||||
if (did_enter_cond)
|
||||
rli->sql_driver_thd->EXIT_COND(&old_stage);
|
||||
else
|
||||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
|
||||
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
||||
|
||||
return false;
|
||||
|
Reference in New Issue
Block a user