1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

Merge 10.0.14 into 10.1

This commit is contained in:
Sergei Golubchik
2014-10-15 12:59:13 +02:00
2115 changed files with 87968 additions and 80173 deletions

View File

@@ -4,18 +4,8 @@
#include "rpl_mi.h"
#include "debug_sync.h"
/*
Code for optional parallel execution of replicated events on the slave.
ToDo list:
- Retry of failed transactions is not yet implemented for the parallel case.
- All the waits (eg. in struct wait_for_commit and in
rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
everything needs to be correctly rolled back and stopped in all threads,
to ensure a consistent slave replication state.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -31,20 +21,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
Log_event *ev;
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
ev= qev->ev;
thd->rgi_slave= rgi;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
ev->thd= thd;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
mysql_mutex_lock(&rli->data_lock);
/* Mutex will be released in apply_event_and_update_pos(). */
err= apply_event_and_update_pos(ev, thd, rgi, rpt);
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@@ -58,6 +50,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{
int cmp;
Relay_log_info *rli;
rpl_parallel_entry *e;
/*
Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread,
@@ -68,6 +62,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
return;
/* Do not update position if an earlier event group caused an error abort. */
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
e= qev->entry_for_queued;
if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
return;
rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
@@ -165,6 +166,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@@ -197,6 +199,290 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
static void
register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
rpl_parallel_entry *entry)
{
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
Register that the commit of this event group must wait for the
commit of the previous event group to complete before it may
complete itself, so that we preserve commit order.
*/
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
}
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
{
if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
rgi->retry_event_count == 4)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
return 1;
}
return 0;
}
#endif
/*
If we detect a deadlock due to eg. storage engine locks that conflict with
the fixed commit order, then the later transaction will be killed
asynchroneously to allow the former to complete its commit.
In this case, we convert the 'killed' error into a deadlock error, and retry
the later transaction. */
static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
THD *thd= rgi->thd;
int err_code;
if (!thd->get_stmt_da()->is_error())
return;
err_code= thd->get_stmt_da()->sql_errno();
if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry)
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
rgi->killed_for_retry= false;
thd->reset_killed();
}
}
static bool
is_group_ending(Log_event *ev, Log_event_type event_type)
{
return event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback()));
}
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
{
IO_CACHE rlog;
LOG_INFO linfo;
File fd= (File)-1;
const char *errmsg= NULL;
inuse_relaylog *ir= rgi->relay_log;
uint64 event_count;
uint64 events_to_execute= rgi->retry_event_count;
Relay_log_info *rli= rgi->rli;
int err;
ulonglong cur_offset, old_offset;
char log_name[FN_REFLEN];
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
do_retry:
event_count= 0;
err= 0;
/*
If we already started committing before getting the deadlock (or other
error) that caused us to need to retry, we have already signalled
subsequent transactions that we have started committing. This is
potentially a problem, as now we will rollback, and if subsequent
transactions would start to execute now, they could see an unexpected
state of the database and get eg. key not found or duplicate key error.
However, to get a deadlock in the first place, there must have been
another earlier transaction that is waiting for us. Thus that other
transaction has _not_ yet started to commit, and any subsequent
transactions will still be waiting at this point.
So here, we decrement back the count of transactions that started
committing (if we already incremented it), undoing the effect of an
earlier mark_start_commit(). Then later, when the retry succeeds and we
commit again, we can do a new mark_start_commit() and eventually wake up
subsequent transactions at the proper time.
We need to do the unmark before the rollback, to be sure that the
transaction we deadlocked with will not signal that it started to commit
until after the unmark.
*/
rgi->unmark_start_commit();
/*
We might get the deadlock error that causes the retry during commit, while
sitting in wait_for_prior_commit(). If this happens, we will have a
pending error in the wait_for_commit object. So clear this by
unregistering (and later re-registering) the wait.
*/
if(thd->wait_for_commit_ptr)
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
rgi->cleanup_context(thd, 1);
/*
If we retry due to a deadlock kill that occured during the commit step, we
might have already updated (but not committed) an update of table
mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have
rolled back any such update, so we must set the gtid_pending flag back to
true so that we will do a new update when/if we succeed with the retry.
*/
rgi->gtid_pending= true;
mysql_mutex_lock(&rli->data_lock);
++rli->retried_trans;
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
mysql_mutex_lock(&entry->LOCK_parallel_entry);
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
goto err;
}
cur_offset= rgi->retry_start_offset;
my_b_seek(&rlog, cur_offset);
do
{
Log_event_type event_type;
Log_event *ev;
rpl_parallel_thread::queued_event *qev;
/* The loop is here so we can try again the next relay log file on EOF. */
for (;;)
{
old_offset= cur_offset;
ev= Log_event::read_log_event(&rlog, 0,
rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog);
if (ev)
break;
if (rlog.error < 0)
{
errmsg= "slave SQL thread aborted because of I/O error";
err= 1;
goto err;
}
if (rlog.error > 0)
{
sql_print_error("Slave SQL thread: I/O error reading "
"event(errno: %d cur_log->error: %d)",
my_errno, rlog.error);
errmsg= "Aborting slave SQL thread because of partial event read";
err= 1;
goto err;
}
/* EOF. Move to the next relay log. */
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
/* Find the next relay log file. */
if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
(err= rli->relay_log.find_next_log(&linfo, 1)))
{
char buff[22];
sql_print_error("next log error: %d offset: %s log: %s",
err,
llstr(linfo.index_file_offset, buff),
log_name);
goto err;
}
strmake_buf(log_name ,linfo.log_file_name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
goto err;
}
/* Loop to try again on the new log file. */
}
event_type= ev->get_type_code();
if (!Log_event::is_group_event(event_type))
{
delete ev;
continue;
}
ev->thd= thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
cur_offset - old_offset);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
if (!qev)
{
delete ev;
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto err;
}
if (is_group_ending(ev, event_type))
rgi->mark_start_commit();
err= rpt_handle_event(qev, rpt);
++event_count;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_qev(qev);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
delete_or_keep_event_post_apply(rgi, event_type, ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd))
{
++retries;
if (retries < slave_trans_retries)
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
goto do_retry;
}
sql_print_error("Slave worker thread retried transaction %lu time(s) "
"in vain, giving up. Consider raising the value of "
"the slave_transaction_retries variable.",
slave_trans_retries);
}
goto err;
}
} while (event_count < events_to_execute);
err:
if (fd >= 0)
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
}
if (errmsg)
sql_print_error("Error reading relay log event: %s", errmsg);
return err;
}
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -215,6 +501,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
inuse_relaylog *last_ir;
uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -244,39 +532,6 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
/*
For now, we need to run the replication parallel worker threads in
READ COMMITTED. This is needed because gap locks are not symmetric.
For example, a gap lock from a DELETE blocks an insert intention lock,
but not vice versa. So an INSERT followed by DELETE can group commit
on the master, but if we are unlucky with thread scheduling we can
then deadlock on the slave because the INSERT ends up waiting for a
gap lock from the DELETE (and the DELETE in turn waits for the INSERT
in wait_for_prior_commit()). See also MDEV-5914.
It should be mostly safe to run in READ COMMITTED in the slave anyway.
The commit order is already fixed from on the master, so we do not
risk logging into the binlog in an incorrect order between worker
threads (one that would cause different results if executed on a
lower-level slave that uses this slave as a master). The only
potential problem is with transactions run in a different master
connection (using multi-source replication), or run directly on the
slave by an application; when using READ COMMITTED we are not
guaranteed serialisability of binlogged statements.
In practice, this is unlikely to be an issue. In GTID mode, such
parallel transactions from multi-source or application must in any
case use a different replication domain, in which case binlog order
by definition must be independent between the different domain. Even
in non-GTID mode, normally one will assume that the external
transactions are not conflicting with those applied by the slave, so
that isolation level should make no difference. It would be rather
strange if the result of applying query events from one master would
depend on the timing and nature of other queries executed from
different multi-source connections or done directly on the slave by
an application. Still, something to be aware of.
*/
thd->variables.tx_isolation= ISO_READ_COMMITTED;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@@ -323,7 +578,7 @@ handle_rpl_parallel_thread(void *arg)
bool end_of_group, group_ending;
total_event_size+= events->event_size;
if (!events->ev)
if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
@@ -331,8 +586,33 @@ handle_rpl_parallel_thread(void *arg)
events= next;
continue;
}
else if (events->typ ==
rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
{
if (in_event_group)
{
/*
Master restarted (crashed) in the middle of an event group.
So we need to roll back and discard that event group.
*/
group_rgi->cleanup_context(thd, 1);
in_event_group= false;
finish_event_group(thd, group_rgi->gtid_sub_id,
events->entry_for_queued, group_rgi);
group_rgi= rgi;
group_rgi->next= rgis_to_free;
rgis_to_free= group_rgi;
thd->rgi_slave= group_rgi= NULL;
}
events->next= qevs_to_free;
qevs_to_free= events;
events= next;
continue;
}
DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
@@ -341,7 +621,6 @@ handle_rpl_parallel_thread(void *arg)
PSI_stage_info old_stage;
uint64 wait_count;
thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -352,9 +631,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
/* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
/*
@@ -388,7 +665,7 @@ handle_rpl_parallel_thread(void *arg)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
@@ -430,17 +707,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
Register that the commit of this event group must wait for the
commit of the previous event group to complete before it may
complete itself, so that we preserve commit order.
*/
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
else
register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@@ -467,7 +736,7 @@ handle_rpl_parallel_thread(void *arg)
if (res < 0)
{
/* Error. */
slave_output_error_info(rgi->rli, thd);
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
}
else if (!res)
@@ -482,11 +751,8 @@ handle_rpl_parallel_thread(void *arg)
}
}
group_ending= event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(((Query_log_event *)events->ev)->is_commit() ||
((Query_log_event *)events->ev)->is_rollback()));
if (group_ending)
group_ending= is_group_ending(events->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
rgi->mark_start_commit();
@@ -498,24 +764,42 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
if (!rgi->worker_error && !skip_event_group)
if (likely(!rgi->worker_error) && !skip_event_group)
{
++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd) && slave_trans_retries > 0)
err= retry_event_group(rgi, rpt, events);
}
}
else
{
delete events->ev;
err= thd->wait_for_prior_commit();
}
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
events->next= qevs_to_free;
qevs_to_free= events;
if (unlikely(err) && !rgi->worker_error)
if (unlikely(err))
{
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi, err);
if (!rgi->worker_error)
{
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, err);
}
thd->reset_killed();
}
if (end_of_group)
{
@@ -523,7 +807,7 @@ handle_rpl_parallel_thread(void *arg)
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -548,12 +832,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
last_ir= NULL;
accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
inuse_relaylog *ir= qevs_to_free->ir;
/* Batch up refcount update to reduce use of synchronised operations. */
if (last_ir != ir)
{
if (last_ir)
{
my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
}
last_ir= ir;
}
++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
if (last_ir)
{
my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
}
if ((events= rpt->event_queue) != NULL)
{
@@ -584,7 +890,7 @@ handle_rpl_parallel_thread(void *arg)
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
group_rgi= NULL;
thd->rgi_slave= group_rgi= NULL;
skip_event_group= false;
}
if (!in_event_group)
@@ -802,8 +1108,7 @@ err:
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli)
rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
queued_event *qev;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -814,9 +1119,21 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL;
}
qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
return qev;
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli)
{
queued_event *qev= get_qev_common(ev, event_size);
if (!qev)
return NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
@@ -825,6 +1142,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
const char *relay_log_name,
ulonglong event_pos, ulonglong event_size)
{
queued_event *qev= get_qev_common(ev, event_size);
if (!qev)
return NULL;
qev->rgi= orig_qev->rgi;
strcpy(qev->event_relay_log_name, relay_log_name);
qev->event_relay_log_pos= event_pos;
qev->future_event_relay_log_pos= event_pos+event_size;
strcpy(qev->future_event_master_log_name,
orig_qev->future_event_master_log_name);
return qev;
}
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
@@ -836,7 +1171,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
rpl_group_info*
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rpl_parallel_entry *e)
rpl_parallel_entry *e, ulonglong event_size)
{
rpl_group_info *rgi;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -864,6 +1199,10 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
return NULL;
}
rgi->parallel_entry= e;
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= false;
return rgi;
}
@@ -1018,10 +1357,11 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
if it is still available. Otherwise a new worker thread is allocated.
*/
rpl_parallel_thread *
rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, bool reuse)
{
uint32 idx;
Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr;
idx= rpl_thread_idx;
@@ -1066,7 +1406,7 @@ rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
slave_output_error_info(rli, rli->sql_driver_thd);
slave_output_error_info(rgi, rli->sql_driver_thd);
return NULL;
}
else
@@ -1300,6 +1640,91 @@ rpl_parallel::workers_idle()
}
int
rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev)
{
uint32 idx;
rpl_parallel_thread *thr;
rpl_parallel_thread::queued_event *qev;
Relay_log_info *rli= rgi->rli;
/*
We only need to queue the server restart if we still have a thread working
on a (potentially partial) event group.
If the last thread we queued for has finished, then it cannot have any
partial event group that needs aborting.
Thus there is no need for the full complexity of choose_thread(). We only
need to check if we have a current worker thread, and queue for it if so.
*/
idx= rpl_thread_idx;
thr= rpl_threads[idx];
if (!thr)
return 0;
mysql_mutex_lock(&thr->LOCK_rpl_thread);
if (thr->current_owner != &rpl_threads[idx])
{
/* No active worker thread, so no need to queue the master restart. */
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 0;
}
if (!(qev= thr->get_qev(fdev, 0, rli)))
{
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 1;
}
qev->rgi= rgi;
qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART;
qev->entry_for_queued= this;
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
thr->enqueue(qev);
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 0;
}
int
rpl_parallel::wait_for_workers_idle(THD *thd)
{
uint32 i, max_i;
/*
The domain_hash is only accessed by the SQL driver thread, so it is safe
to iterate over without a lock.
*/
max_i= domain_hash.records;
for (i= 0; i < max_i; ++i)
{
bool active;
wait_for_commit my_orderer;
struct rpl_parallel_entry *e;
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
if ((active= (e->current_sub_id > e->last_committed_sub_id)))
{
wait_for_commit *waitee= &e->current_group_info->commit_orderer;
my_orderer.register_wait_for_prior_commit(waitee);
thd->wait_for_commit_ptr= &my_orderer;
}
mysql_mutex_unlock(&e->LOCK_parallel_entry);
if (active)
{
int err= my_orderer.wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL;
if (err)
return err;
}
}
return 0;
}
/*
This is used when we get an error during processing in do_event();
We will not queue any event to the thread, but we still need to wake it up
@@ -1367,6 +1792,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
if (typ == FORMAT_DESCRIPTION_EVENT)
{
Format_description_log_event *fdev=
static_cast<Format_description_log_event *>(ev);
if (fdev->created)
{
/*
This format description event marks a new binlog after a master server
restart. We are going to close all temporary tables to clean up any
possible left-overs after a prior master crash.
Thus we need to wait for all prior events to execute to completion,
in case they need access to any of the temporary tables.
We also need to notify the worker thread running the prior incomplete
event group (if any), as such event group signifies an incompletely
written group cut short by a master crash, and must be rolled back.
*/
if (current->queue_master_restart(serial_rgi, fdev) ||
wait_for_workers_idle(rli->sql_driver_thd))
{
delete ev;
return 1;
}
}
}
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
@@ -1390,15 +1842,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
uint32 domain_id;
if (likely(typ == GTID_EVENT))
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
0 : gtid_ev->domain_id);
}
else
domain_id= 0;
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
@@ -1417,7 +1863,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
instead re-use a thread that we queued for previously.
*/
cur_thread=
e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT);
e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
typ != GTID_EVENT);
if (!cur_thread)
{
/* This means we were killed. The error is already signalled. */
@@ -1437,7 +1884,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
@@ -1527,7 +1974,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
/*
Queue an empty event, so that the position will be updated in a
Queue a position update, so that the position will be updated in a
reasonable way relative to other events:
- If the currently executing events are queued serially for a single
@@ -1538,7 +1985,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
qev->ev= NULL;
qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
qev->entry_for_queued= e;
}
else
{
@@ -1549,6 +1997,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing.
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage);