mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
Merge branch '10.6' into 10.11
This commit is contained in:
@ -215,6 +215,13 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
||||
signal_error_to_sql_driver_thread(thd, rgi, err);
|
||||
thd->wait_for_commit_ptr= NULL;
|
||||
|
||||
/*
|
||||
Calls to check_duplicate_gtid() must match up with
|
||||
record_and_update_gtid() (or release_domain_owner() in error case). This
|
||||
assertion tries to catch any missing release of the domain.
|
||||
*/
|
||||
DBUG_ASSERT(rgi->gtid_ignore_duplicate_state != rpl_group_info::GTID_DUPLICATE_OWNER);
|
||||
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
/*
|
||||
We need to mark that this event group started its commit phase, in case we
|
||||
@ -908,7 +915,13 @@ do_retry:
|
||||
});
|
||||
#endif
|
||||
|
||||
rgi->cleanup_context(thd, 1);
|
||||
/*
|
||||
We are still applying the event group, even though we will roll it back
|
||||
and retry it. So for --gtid-ignore-duplicates, keep ownership of the
|
||||
domain during the retry so another master connection will not try to take
|
||||
over and duplicate apply the same event group (MDEV-33475).
|
||||
*/
|
||||
rgi->cleanup_context(thd, 1, 1 /* keep_domain_owner */);
|
||||
wait_for_pending_deadlock_kill(thd, rgi);
|
||||
thd->reset_killed();
|
||||
thd->clear_error();
|
||||
@ -2405,13 +2418,17 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
|
||||
false Worker not allocated (choose_thread_internal not called)
|
||||
*/
|
||||
static bool handle_split_alter(rpl_parallel_entry *e,
|
||||
Gtid_log_event *gtid_ev, uint32 *idx,
|
||||
Gtid_log_event *gtid_ev,
|
||||
//uint32 *idx,
|
||||
rpl_parallel_entry::sched_bucket **ptr_cur_thr,
|
||||
//choose_thread_internal specific
|
||||
bool *did_enter_cond, rpl_group_info* rgi,
|
||||
PSI_stage_info *old_stage)
|
||||
{
|
||||
uint16 flags_extra= gtid_ev->flags_extra;
|
||||
bool thread_allocated= false;
|
||||
uint32 i= 0, *idx= &i;
|
||||
|
||||
//Step 1
|
||||
if (flags_extra & Gtid_log_event::FL_START_ALTER_E1 ||
|
||||
//This will arrange finding threads for CA/RA as well
|
||||
@ -2422,11 +2439,12 @@ static bool handle_split_alter(rpl_parallel_entry *e,
|
||||
j is needed for round robin scheduling, we will start with rpl_thread_idx
|
||||
go till rpl_thread_max and then start with 0 to rpl_thread_idx
|
||||
*/
|
||||
int j= e->rpl_thread_idx;
|
||||
auto j= static_cast<uint32>(e->thread_sched_fifo->head() - e->rpl_threads); // formerly e->rpl_thread_idx;
|
||||
for(uint i= 0; i < e->rpl_thread_max; i++)
|
||||
{
|
||||
if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner
|
||||
!= &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id)
|
||||
if (!e->rpl_threads[j].thr ||
|
||||
e->rpl_threads[j].thr->current_owner != &e->rpl_threads[j].thr ||
|
||||
!e->rpl_threads[j].thr->current_start_alter_id)
|
||||
{
|
||||
//This condition will hit atleast one time no matter what happens
|
||||
*idx= j;
|
||||
@ -2437,17 +2455,26 @@ static bool handle_split_alter(rpl_parallel_entry *e,
|
||||
j= j % e->rpl_thread_max;
|
||||
}
|
||||
//We did not find and idx
|
||||
DBUG_ASSERT(0);
|
||||
return false;
|
||||
DBUG_ASSERT(0);
|
||||
|
||||
return false;
|
||||
|
||||
idx_found:
|
||||
e->rpl_thread_idx= *idx;
|
||||
e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage);
|
||||
//e->rpl_thread_idx= *idx;
|
||||
/* place the found *idx index into the head */
|
||||
*ptr_cur_thr= &e->rpl_threads[*idx];
|
||||
(*ptr_cur_thr)->unlink();
|
||||
e->thread_sched_fifo->append(*ptr_cur_thr);
|
||||
*ptr_cur_thr= e->thread_sched_fifo->head();
|
||||
|
||||
e->choose_thread_internal(*ptr_cur_thr, did_enter_cond, rgi,
|
||||
old_stage);
|
||||
thread_allocated= true;
|
||||
if (flags_extra & Gtid_log_event::FL_START_ALTER_E1)
|
||||
{
|
||||
mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread);
|
||||
e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no;
|
||||
e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id=
|
||||
mysql_mutex_assert_owner(&e->rpl_threads[*idx].thr->LOCK_rpl_thread);
|
||||
e->rpl_threads[*idx].thr->current_start_alter_id= gtid_ev->seq_no;
|
||||
e->rpl_threads[*idx].thr->current_start_alter_domain_id=
|
||||
gtid_ev->domain_id;
|
||||
/*
|
||||
We are locking LOCK_rpl_thread_pool becuase we are going to update
|
||||
@ -2463,9 +2490,9 @@ idx_found:
|
||||
}
|
||||
else
|
||||
{
|
||||
e->rpl_threads[*idx]->reserved_start_alter_thread= true;
|
||||
e->rpl_threads[*idx]->current_start_alter_id= 0;
|
||||
e->rpl_threads[*idx]->current_start_alter_domain_id= 0;
|
||||
e->rpl_threads[*idx].thr->reserved_start_alter_thread= true;
|
||||
e->rpl_threads[*idx].thr->current_start_alter_id= 0;
|
||||
e->rpl_threads[*idx].thr->current_start_alter_domain_id= 0;
|
||||
}
|
||||
mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
|
||||
}
|
||||
@ -2476,13 +2503,13 @@ idx_found:
|
||||
//Free the corrosponding rpt current_start_alter_id
|
||||
for(uint i= 0; i < e->rpl_thread_max; i++)
|
||||
{
|
||||
if(e->rpl_threads[i] &&
|
||||
e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no &&
|
||||
e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id)
|
||||
if(e->rpl_threads[i].thr &&
|
||||
e->rpl_threads[i].thr->current_start_alter_id == gtid_ev->sa_seq_no &&
|
||||
e->rpl_threads[i].thr->current_start_alter_domain_id == gtid_ev->domain_id)
|
||||
{
|
||||
mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
|
||||
e->rpl_threads[i]->current_start_alter_id= 0;
|
||||
e->rpl_threads[i]->current_start_alter_domain_id= 0;
|
||||
e->rpl_threads[i].thr->current_start_alter_id= 0;
|
||||
e->rpl_threads[i].thr->current_start_alter_domain_id= 0;
|
||||
global_rpl_thread_pool.current_start_alters--;
|
||||
e->pending_start_alters--;
|
||||
DBUG_PRINT("info", ("Commit/Rollback alter id %d", i));
|
||||
@ -2497,6 +2524,79 @@ idx_found:
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Check when we have done a complete round of scheduling for workers
|
||||
0, 1, ..., (rpl_thread_max-1), in this order.
|
||||
This often occurs every rpl_thread_max event group, but XA XID dependency
|
||||
restrictions can cause insertion of extra out-of-order worker scheduling
|
||||
in-between the normal round-robin scheduling.
|
||||
*/
|
||||
void
|
||||
rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur)
|
||||
{
|
||||
uint32 idx= static_cast<uint32>(cur - rpl_threads);
|
||||
DBUG_ASSERT(cur >= rpl_threads);
|
||||
DBUG_ASSERT(cur < rpl_threads + rpl_thread_max);
|
||||
if (idx == current_generation_idx)
|
||||
{
|
||||
++idx;
|
||||
if (idx >= rpl_thread_max)
|
||||
{
|
||||
/* A new generation; all workers have been scheduled at least once. */
|
||||
idx= 0;
|
||||
++current_generation;
|
||||
}
|
||||
current_generation_idx= idx;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
rpl_parallel_entry::sched_bucket *
|
||||
rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid)
|
||||
{
|
||||
uint64 cur_gen= current_generation;
|
||||
my_off_t i= 0;
|
||||
while (i < maybe_active_xid.elements)
|
||||
{
|
||||
/*
|
||||
Purge no longer active XID from the list:
|
||||
|
||||
- In generation N, XID might have been scheduled for worker W.
|
||||
- Events in generation (N+1) might run freely in parallel with W.
|
||||
- Events in generation (N+2) will have done wait_for_prior_commit for
|
||||
the event group with XID (or a later one), but the XID might still be
|
||||
active for a bit longer after wakeup_prior_commit().
|
||||
- Events in generation (N+3) will have done wait_for_prior_commit() for
|
||||
an event in W _after_ the XID, so are sure not to see the XID active.
|
||||
|
||||
Therefore, XID can be safely scheduled to a different worker in
|
||||
generation (N+3) when last prior use was in generation N (or earlier).
|
||||
*/
|
||||
xid_active_generation *a=
|
||||
dynamic_element(&maybe_active_xid, i, xid_active_generation *);
|
||||
if (a->generation + 3 <= cur_gen)
|
||||
{
|
||||
*a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid));
|
||||
continue;
|
||||
}
|
||||
if (xid->eq(&a->xid))
|
||||
{
|
||||
/* Update the last used generation and return the match. */
|
||||
a->generation= cur_gen;
|
||||
return a->thr;
|
||||
}
|
||||
++i;
|
||||
}
|
||||
/* try to keep allocated memory in the range of [2,10] * initial_chunk_size */
|
||||
if (maybe_active_xid.elements <= 2 * active_xid_init_alloc() &&
|
||||
maybe_active_xid.max_element > 10 * active_xid_init_alloc())
|
||||
freeze_size(&maybe_active_xid);
|
||||
|
||||
/* No matching XID conflicts. */
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Obtain a worker thread that we can queue an event to.
|
||||
|
||||
@ -2529,40 +2629,70 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
|
||||
PSI_stage_info *old_stage,
|
||||
Gtid_log_event *gtid_ev)
|
||||
{
|
||||
uint32 idx;
|
||||
sched_bucket *cur_thr;
|
||||
|
||||
idx= rpl_thread_idx;
|
||||
if (gtid_ev)
|
||||
{
|
||||
if (++idx >= rpl_thread_max)
|
||||
idx= 0;
|
||||
/* New event group; cycle the thread scheduling buckets round-robin. */
|
||||
thread_sched_fifo->push_back(thread_sched_fifo->get());
|
||||
|
||||
//rpl_thread_idx will be updated handle_split_alter
|
||||
if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage))
|
||||
return rpl_threads[idx];
|
||||
if (handle_split_alter(this, gtid_ev, &cur_thr, did_enter_cond, rgi,
|
||||
old_stage))
|
||||
return cur_thr->thr;
|
||||
|
||||
if (gtid_ev->flags2 &
|
||||
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
|
||||
{
|
||||
idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
|
||||
gtid_ev->xid.key_length()) % rpl_thread_max;
|
||||
{
|
||||
if ((cur_thr= check_xa_xid_dependency(>id_ev->xid)))
|
||||
{
|
||||
/*
|
||||
A previously scheduled event group with the same XID might still be
|
||||
active in a worker, so schedule this event group in the same worker
|
||||
to avoid a conflict.
|
||||
*/
|
||||
cur_thr->unlink();
|
||||
thread_sched_fifo->append(cur_thr);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Record this XID now active. */
|
||||
xid_active_generation *a=
|
||||
(xid_active_generation *)alloc_dynamic(&maybe_active_xid);
|
||||
if (!a)
|
||||
return NULL;
|
||||
a->thr= cur_thr= thread_sched_fifo->head();
|
||||
a->generation= current_generation;
|
||||
a->xid.set(>id_ev->xid);
|
||||
}
|
||||
}
|
||||
rpl_thread_idx= idx;
|
||||
else
|
||||
cur_thr= thread_sched_fifo->head();
|
||||
|
||||
check_scheduling_generation(cur_thr);
|
||||
}
|
||||
return choose_thread_internal(idx, did_enter_cond, rgi, old_stage);
|
||||
else
|
||||
cur_thr= thread_sched_fifo->head();
|
||||
|
||||
return choose_thread_internal(cur_thr /*idx*/, did_enter_cond, rgi, old_stage);
|
||||
}
|
||||
|
||||
rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
|
||||
bool *did_enter_cond, rpl_group_info *rgi,
|
||||
PSI_stage_info *old_stage)
|
||||
rpl_parallel_thread *
|
||||
rpl_parallel_entry::choose_thread_internal(sched_bucket *cur_thr,
|
||||
bool *did_enter_cond,
|
||||
rpl_group_info *rgi,
|
||||
PSI_stage_info *old_stage)
|
||||
{
|
||||
rpl_parallel_thread* thr= rpl_threads[idx];
|
||||
Relay_log_info *rli= rgi->rli;
|
||||
rpl_parallel_thread *thr= cur_thr->thr;
|
||||
|
||||
if (thr)
|
||||
{
|
||||
*did_enter_cond= false;
|
||||
mysql_mutex_lock(&thr->LOCK_rpl_thread);
|
||||
for (;;)
|
||||
{
|
||||
if (thr->current_owner != &rpl_threads[idx])
|
||||
if (thr->current_owner != &cur_thr->thr)
|
||||
{
|
||||
/*
|
||||
The worker thread became idle, and returned to the free list and
|
||||
@ -2594,17 +2724,16 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
|
||||
and this can cause THD::awake to use the wrong mutex.
|
||||
*/
|
||||
#ifdef ENABLED_DEBUG_SYNC
|
||||
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
|
||||
{
|
||||
debug_sync_set_action(rli->sql_driver_thd,
|
||||
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
|
||||
};);
|
||||
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", {
|
||||
debug_sync_set_action(
|
||||
rli->sql_driver_thd,
|
||||
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
|
||||
};);
|
||||
#endif
|
||||
rli->sql_driver_thd->set_time_for_next_stage();
|
||||
rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue,
|
||||
&thr->LOCK_rpl_thread,
|
||||
&stage_waiting_for_room_in_worker_thread,
|
||||
old_stage);
|
||||
rli->sql_driver_thd->ENTER_COND(
|
||||
&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread,
|
||||
&stage_waiting_for_room_in_worker_thread, old_stage);
|
||||
*did_enter_cond= true;
|
||||
}
|
||||
|
||||
@ -2614,11 +2743,11 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
|
||||
did_enter_cond, old_stage);
|
||||
my_error(ER_CONNECTION_KILLED, MYF(0));
|
||||
#ifdef ENABLED_DEBUG_SYNC
|
||||
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
|
||||
{
|
||||
debug_sync_set_action(rli->sql_driver_thd,
|
||||
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
|
||||
};);
|
||||
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", {
|
||||
debug_sync_set_action(
|
||||
rli->sql_driver_thd,
|
||||
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
|
||||
};);
|
||||
#endif
|
||||
slave_output_error_info(rgi, rli->sql_driver_thd);
|
||||
return NULL;
|
||||
@ -2628,9 +2757,10 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!thr)
|
||||
rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
|
||||
this);
|
||||
cur_thr->thr= thr=
|
||||
global_rpl_thread_pool.get_thread(&cur_thr->thr, this);
|
||||
|
||||
return thr;
|
||||
}
|
||||
@ -2645,6 +2775,7 @@ free_rpl_parallel_entry(void *element)
|
||||
dealloc_gco(e->current_gco);
|
||||
e->current_gco= prev_gco;
|
||||
}
|
||||
delete_dynamic(&e->maybe_active_xid);
|
||||
mysql_cond_destroy(&e->COND_parallel_entry);
|
||||
mysql_mutex_destroy(&e->LOCK_parallel_entry);
|
||||
my_free(e);
|
||||
@ -2688,17 +2819,37 @@ rpl_parallel::find(uint32 domain_id, Relay_log_info *rli)
|
||||
ulong count= opt_slave_domain_parallel_threads;
|
||||
if (count == 0 || count > opt_slave_parallel_threads)
|
||||
count= opt_slave_parallel_threads;
|
||||
rpl_parallel_thread **p;
|
||||
rpl_parallel_entry::sched_bucket *p;
|
||||
I_List<rpl_parallel_entry::sched_bucket> *fifo;
|
||||
if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL),
|
||||
&e, sizeof(*e),
|
||||
&p, count*sizeof(*p),
|
||||
&fifo, sizeof(*fifo),
|
||||
NULL))
|
||||
{
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
|
||||
return NULL;
|
||||
}
|
||||
/* Initialize a FIFO of scheduled worker threads. */
|
||||
e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>;
|
||||
/*
|
||||
(We cycle the FIFO _before_ allocating next entry in
|
||||
rpl_parallel_entry::choose_thread(). So initialize the FIFO with the
|
||||
highest element at the front, just so that the first event group gets
|
||||
scheduled on entry 0).
|
||||
*/
|
||||
e->thread_sched_fifo->
|
||||
push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket);
|
||||
for (ulong i= 0; i < count-1; ++i)
|
||||
e->thread_sched_fifo->
|
||||
push_back(::new (p+i) rpl_parallel_entry::sched_bucket);
|
||||
e->rpl_threads= p;
|
||||
e->rpl_thread_max= count;
|
||||
e->current_generation = 0;
|
||||
e->current_generation_idx = 0;
|
||||
init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid,
|
||||
sizeof(rpl_parallel_entry::xid_active_generation),
|
||||
0, e->active_xid_init_alloc(), 0, MYF(0));
|
||||
e->domain_id= domain_id;
|
||||
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
|
||||
e->pause_sub_id= (uint64)ULONGLONG_MAX;
|
||||
@ -2768,10 +2919,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
for (j= 0; j < e->rpl_thread_max; ++j)
|
||||
{
|
||||
if ((rpt= e->rpl_threads[j]))
|
||||
if ((rpt= e->rpl_threads[j].thr))
|
||||
{
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
if (rpt->current_owner == &e->rpl_threads[j])
|
||||
if (rpt->current_owner == &e->rpl_threads[j].thr)
|
||||
mysql_cond_signal(&rpt->COND_rpl_thread);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
}
|
||||
@ -2830,10 +2981,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
||||
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
||||
for (j= 0; j < e->rpl_thread_max; ++j)
|
||||
{
|
||||
if ((rpt= e->rpl_threads[j]))
|
||||
if ((rpt= e->rpl_threads[j].thr))
|
||||
{
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
while (rpt->current_owner == &e->rpl_threads[j])
|
||||
while (rpt->current_owner == &e->rpl_threads[j].thr)
|
||||
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
}
|
||||
@ -2891,7 +3042,7 @@ int
|
||||
rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
|
||||
Format_description_log_event *fdev)
|
||||
{
|
||||
uint32 idx;
|
||||
sched_bucket *cur_thr;
|
||||
rpl_parallel_thread *thr;
|
||||
rpl_parallel_thread::queued_event *qev;
|
||||
Relay_log_info *rli= rgi->rli;
|
||||
@ -2906,12 +3057,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
|
||||
Thus there is no need for the full complexity of choose_thread(). We only
|
||||
need to check if we have a current worker thread, and queue for it if so.
|
||||
*/
|
||||
idx= rpl_thread_idx;
|
||||
thr= rpl_threads[idx];
|
||||
cur_thr= thread_sched_fifo->head();
|
||||
thr= cur_thr->thr;
|
||||
if (!thr)
|
||||
return 0;
|
||||
mysql_mutex_lock(&thr->LOCK_rpl_thread);
|
||||
if (thr->current_owner != &rpl_threads[idx])
|
||||
if (thr->current_owner != &cur_thr->thr)
|
||||
{
|
||||
/* No active worker thread, so no need to queue the master restart. */
|
||||
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
|
||||
|
Reference in New Issue
Block a user