1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

Merge branch '10.0' into bb-10.1-merge

Conflicts:
	.bzrignore
	VERSION
	cmake/plugin.cmake
	debian/dist/Debian/control
	debian/dist/Ubuntu/control
	mysql-test/r/join_outer.result
	mysql-test/r/join_outer_jcl6.result
	mysql-test/r/null.result
	mysql-test/r/old-mode.result
	mysql-test/r/union.result
	mysql-test/t/join_outer.test
	mysql-test/t/null.test
	mysql-test/t/old-mode.test
	mysql-test/t/union.test
	packaging/rpm-oel/mysql.spec.in
	scripts/mysql_config.sh
	sql/ha_ndbcluster.cc
	sql/ha_ndbcluster_binlog.cc
	sql/ha_ndbcluster_cond.cc
	sql/item_cmpfunc.h
	sql/lock.cc
	sql/sql_select.cc
	sql/sql_show.cc
	sql/sql_update.cc
	sql/sql_yacc.yy
	storage/innobase/buf/buf0flu.cc
	storage/innobase/fil/fil0fil.cc
	storage/innobase/include/srv0srv.h
	storage/innobase/lock/lock0lock.cc
	storage/tokudb/CMakeLists.txt
	storage/xtradb/buf/buf0flu.cc
	storage/xtradb/fil/fil0fil.cc
	storage/xtradb/include/srv0srv.h
	storage/xtradb/lock/lock0lock.cc
	support-files/mysql.spec.sh
This commit is contained in:
Sergei Golubchik
2014-12-02 22:25:16 +01:00
2354 changed files with 594871 additions and 12560 deletions

View File

@@ -8,6 +8,15 @@
Code for optional parallel execution of replicated events on the slave.
*/
/*
Maximum number of queued events to accumulate in a local free list, before
moving them to the global free list. There is additional a limit of how much
to accumulate based on opt_slave_parallel_max_queued.
*/
#define QEV_BATCH_FREE 200
struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
@@ -290,6 +299,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
Format_description_log_event *description_event= NULL;
do_retry:
event_count= 0;
@@ -355,6 +365,14 @@ do_retry:
goto err;
}
cur_offset= rgi->retry_start_offset;
delete description_event;
description_event=
read_relay_log_description_event(&rlog, cur_offset, &errmsg);
if (!description_event)
{
err= 1;
goto err;
}
my_b_seek(&rlog, cur_offset);
do
@@ -367,8 +385,7 @@ do_retry:
for (;;)
{
old_offset= cur_offset;
ev= Log_event::read_log_event(&rlog, 0,
rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
ev= Log_event::read_log_event(&rlog, 0, description_event,
opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog);
@@ -416,7 +433,12 @@ do_retry:
}
event_type= ev->get_type_code();
if (!Log_event::is_group_event(event_type))
if (event_type == FORMAT_DESCRIPTION_EVENT)
{
delete description_event;
description_event= (Format_description_log_event *)ev;
continue;
} else if (!Log_event::is_group_event(event_type))
{
delete ev;
continue;
@@ -424,7 +446,7 @@ do_retry:
ev->thd= thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
qev= rpt->retry_get_qev(ev, orig_qev, log_name, old_offset,
cur_offset - old_offset);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
if (!qev)
@@ -472,6 +494,8 @@ do_retry:
err:
if (description_event)
delete description_event;
if (fd >= 0)
{
end_io_cache(&rlog);
@@ -495,14 +519,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_group_info *group_rgi= NULL;
group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
rpl_parallel_thread::queued_event *qevs_to_free;
rpl_group_info *rgis_to_free;
group_commit_orderer *gcos_to_free;
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
inuse_relaylog *last_ir;
uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -544,6 +562,8 @@ handle_rpl_parallel_thread(void *arg)
while (!rpt->stop)
{
rpl_parallel_thread::queued_event *qev, *next_qev;
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
&stage_waiting_for_work_from_sql_thread, &old_stage);
/*
@@ -565,28 +585,21 @@ handle_rpl_parallel_thread(void *arg)
thd->EXIT_COND(&old_stage);
more_events:
qevs_to_free= NULL;
rgis_to_free= NULL;
gcos_to_free= NULL;
total_event_size= 0;
while (events)
for (qev= events; qev; qev= next_qev)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_group_info *rgi= qev->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
bool end_of_group, group_ending;
total_event_size+= events->event_size;
if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
next_qev= qev->next;
if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
qevs_to_free= events;
events= next;
handle_queued_pos_update(thd, qev);
rpt->loc_free_qev(qev);
continue;
}
else if (events->typ ==
else if (qev->typ ==
rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
{
if (in_event_group)
@@ -598,29 +611,34 @@ handle_rpl_parallel_thread(void *arg)
group_rgi->cleanup_context(thd, 1);
in_event_group= false;
finish_event_group(thd, group_rgi->gtid_sub_id,
events->entry_for_queued, group_rgi);
qev->entry_for_queued, group_rgi);
group_rgi->next= rgis_to_free;
rgis_to_free= group_rgi;
rpt->loc_free_rgi(group_rgi);
thd->rgi_slave= group_rgi= NULL;
}
events->next= qevs_to_free;
qevs_to_free= events;
events= next;
rpt->loc_free_qev(qev);
continue;
}
DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
{
bool did_enter_cond= false;
PSI_stage_info old_stage;
uint64 wait_count;
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.seq_no == 100) {
debug_sync_set_action(thd,
STRING_WITH_LEN("now SIGNAL scheduled_gtid_0_x_100"));
}
});
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -628,7 +646,7 @@ handle_rpl_parallel_thread(void *arg)
similar), without any terminating COMMIT/ROLLBACK/XID.
*/
group_standalone=
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
(0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
event_gtid_sub_id= rgi->gtid_sub_id;
@@ -656,7 +674,7 @@ handle_rpl_parallel_thread(void *arg)
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
thd->ENTER_COND(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry,
&stage_waiting_for_prior_transaction_to_commit,
&stage_waiting_for_prior_transaction_to_start_commit,
&old_stage);
did_enter_cond= true;
do
@@ -689,8 +707,7 @@ handle_rpl_parallel_thread(void *arg)
*/
DBUG_ASSERT(!tmp_gco->prev_gco);
gco->prev_gco= NULL;
tmp_gco->next_gco= gcos_to_free;
gcos_to_free= tmp_gco;
rpt->loc_free_gco(tmp_gco);
}
if (entry->force_abort && wait_count > entry->stop_count)
@@ -751,7 +768,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
group_ending= is_group_ending(events->ev, event_type);
group_ending= is_group_ending(qev->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@@ -767,20 +784,32 @@ handle_rpl_parallel_thread(void *arg)
if (likely(!rgi->worker_error) && !skip_event_group)
{
++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
#ifndef DBUG_OFF
err= 0;
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid",
if (event_type == XID_EVENT)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
err= 1;
});
if (!err)
#endif
err= rpt_handle_event(qev, rpt);
delete_or_keep_event_post_apply(rgi, event_type, qev->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd) && slave_trans_retries > 0)
err= retry_event_group(rgi, rpt, events);
err= retry_event_group(rgi, rpt, qev);
}
}
else
{
delete events->ev;
delete qev->ev;
err= thd->wait_for_prior_commit();
}
@@ -789,8 +818,7 @@ handle_rpl_parallel_thread(void *arg)
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
events->next= qevs_to_free;
qevs_to_free= events;
rpt->loc_free_qev(qev);
if (unlikely(err))
{
@@ -805,61 +833,20 @@ handle_rpl_parallel_thread(void *arg)
{
in_event_group= false;
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
rpt->loc_free_rgi(rgi);
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
events= next;
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
/* Signal that our queue can now accept more events. */
rpt->dequeue2(total_event_size);
mysql_cond_signal(&rpt->COND_rpl_thread_queue);
/* We need to delay the free here, to when we have the lock. */
while (gcos_to_free)
{
group_commit_orderer *next= gcos_to_free->next_gco;
rpt->free_gco(gcos_to_free);
gcos_to_free= next;
}
while (rgis_to_free)
{
rpl_group_info *next= rgis_to_free->next;
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
last_ir= NULL;
accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
inuse_relaylog *ir= qevs_to_free->ir;
/* Batch up refcount update to reduce use of synchronised operations. */
if (last_ir != ir)
{
if (last_ir)
{
my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
}
last_ir= ir;
}
++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
if (last_ir)
{
my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
}
/*
Now that we have the lock, we can move everything from our local free
lists to the real free lists that are also accessible from the SQL
driver thread.
*/
rpt->batch_free();
if ((events= rpt->event_queue) != NULL)
{
@@ -872,6 +859,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events;
}
rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
{
@@ -1107,6 +1095,51 @@ err:
}
void
rpl_parallel_thread::batch_free()
{
mysql_mutex_assert_owner(&LOCK_rpl_thread);
if (loc_qev_list)
{
*loc_qev_last_ptr_ptr= qev_free_list;
qev_free_list= loc_qev_list;
loc_qev_list= NULL;
dequeue2(loc_qev_size);
/* Signal that our queue can now accept more events. */
mysql_cond_signal(&COND_rpl_thread_queue);
loc_qev_size= 0;
qev_free_pending= 0;
}
if (loc_rgi_list)
{
*loc_rgi_last_ptr_ptr= rgi_free_list;
rgi_free_list= loc_rgi_list;
loc_rgi_list= NULL;
}
if (loc_gco_list)
{
*loc_gco_last_ptr_ptr= gco_free_list;
gco_free_list= loc_gco_list;
loc_gco_list= NULL;
}
}
void
rpl_parallel_thread::inuse_relaylog_refcount_update()
{
inuse_relaylog *ir= accumulated_ir_last;
if (ir)
{
my_atomic_rwlock_wrlock(&ir->rli->inuse_relaylog_atomic_lock);
my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&ir->rli->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
accumulated_ir_last= NULL;
}
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
@@ -1160,6 +1193,43 @@ rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
}
void
rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev)
{
inuse_relaylog *ir= qev->ir;
inuse_relaylog *last_ir= accumulated_ir_last;
if (ir != last_ir)
{
if (last_ir)
inuse_relaylog_refcount_update();
accumulated_ir_last= ir;
}
++accumulated_ir_count;
if (!loc_qev_list)
loc_qev_last_ptr_ptr= &qev->next;
else
qev->next= loc_qev_list;
loc_qev_list= qev;
loc_qev_size+= qev->event_size;
/*
We want to release to the global free list only occasionally, to avoid
having to take the LOCK_rpl_thread muted too many times.
However, we do need to release regularly. If we let the unreleased part
grow too large, then the SQL driver thread may go to sleep waiting for
the queue to drop below opt_slave_parallel_max_queued, and this in turn
can stall all other worker threads for more stuff to do.
*/
if (++qev_free_pending >= QEV_BATCH_FREE ||
loc_qev_size >= opt_slave_parallel_max_queued/3)
{
mysql_mutex_lock(&LOCK_rpl_thread);
batch_free();
mysql_mutex_unlock(&LOCK_rpl_thread);
}
}
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
@@ -1208,6 +1278,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
}
void
rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi)
{
DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
rgi->free_annotate_event();
if (!loc_rgi_list)
loc_rgi_last_ptr_ptr= &rgi->next;
else
rgi->next= loc_rgi_list;
loc_rgi_list= rgi;
}
void
rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
{
@@ -1242,12 +1325,14 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
void
rpl_parallel_thread::free_gco(group_commit_orderer *gco)
rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
{
mysql_mutex_assert_owner(&LOCK_rpl_thread);
DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
gco->next_gco= gco_free_list;
gco_free_list= gco;
if (!loc_gco_list)
loc_gco_last_ptr_ptr= &gco->next_gco;
else
gco->next_gco= loc_gco_list;
loc_gco_list= gco;
}
@@ -1683,6 +1768,7 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
thr->enqueue(qev);
mysql_cond_signal(&thr->COND_rpl_thread);
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 0;
}