diff --git a/sql/log.cc b/sql/log.cc index 49688a8b3c6..e29758b7f0b 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -5778,43 +5778,12 @@ TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue) return prev; } -void -TC_LOG_queued::group_commit_wait_for_wakeup(TC_group_commit_entry *entry) -{ - THD *thd= entry->thd; - pthread_mutex_lock(&thd->LOCK_commit_ordered); - while (!entry->group_commit_ready) - pthread_cond_wait(&thd->COND_commit_ordered, - &thd->LOCK_commit_ordered); - pthread_mutex_unlock(&thd->LOCK_commit_ordered); -} - -void -TC_LOG_queued::group_commit_wakeup_other(TC_group_commit_entry *other) -{ - THD *thd= other->thd; - pthread_mutex_lock(&thd->LOCK_commit_ordered); - other->group_commit_ready= TRUE; - pthread_cond_signal(&thd->COND_commit_ordered); - pthread_mutex_unlock(&thd->LOCK_commit_ordered); -} - -TC_LOG_unordered::TC_LOG_unordered() : group_commit_queue_busy(0) -{ - pthread_cond_init(&COND_queue_busy, 0); -} - -TC_LOG_unordered::~TC_LOG_unordered() -{ - pthread_cond_destroy(&COND_queue_busy); -} - -int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, - bool need_prepare_ordered, - bool need_commit_ordered) +int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) { int cookie; - struct TC_group_commit_entry entry; + struct commit_entry entry; bool is_group_commit_leader; LINT_INIT(is_group_commit_leader); @@ -5828,18 +5797,18 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, Must put us in queue so we can run_commit_ordered() in same sequence as we did run_prepare_ordered(). */ + thd->clear_wakeup_ready(); entry.thd= thd; - entry.group_commit_ready= false; - TC_group_commit_entry *previous_queue= group_commit_queue; + commit_entry *previous_queue= commit_ordered_queue; entry.next= previous_queue; - group_commit_queue= &entry; + commit_ordered_queue= &entry; is_group_commit_leader= (previous_queue == NULL); } pthread_mutex_unlock(&LOCK_prepare_ordered); } if (xid) - cookie= log_xid(thd, xid); + cookie= log_one_transaction(xid); else cookie= 0; @@ -5859,24 +5828,32 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, { /* The first in queue starts the ball rolling. */ pthread_mutex_lock(&LOCK_prepare_ordered); - while (group_commit_queue_busy) + while (commit_ordered_queue_busy) pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); - TC_group_commit_entry *queue= group_commit_queue; - group_commit_queue= NULL; + commit_entry *queue= commit_ordered_queue; + commit_ordered_queue= NULL; /* Mark the queue busy while we bounce it from one thread to the next. */ - group_commit_queue_busy= TRUE; + commit_ordered_queue_busy= true; pthread_mutex_unlock(&LOCK_prepare_ordered); - queue= reverse_queue(queue); - DBUG_ASSERT(queue == &entry && queue->thd == thd); + /* Reverse the queue list so we get correct order. */ + commit_entry *prev= NULL; + while (queue) + { + commit_entry *next= queue->next; + queue->next= prev; + prev= queue; + queue= next; + } + DBUG_ASSERT(prev == &entry && prev->thd == thd); } else { /* Not first in queue; just wait until previous thread wakes us up. */ - group_commit_wait_for_wakeup(&entry); + thd->wait_for_wakeup_ready(); } } @@ -5890,15 +5867,15 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, if (need_prepare_ordered) { - TC_group_commit_entry *next= entry.next; + commit_entry *next= entry.next; if (next) { - group_commit_wakeup_other(next); + next->thd->signal_wakeup_ready(); } else { pthread_mutex_lock(&LOCK_prepare_ordered); - group_commit_queue_busy= FALSE; + commit_ordered_queue_busy= false; pthread_cond_signal(&COND_queue_busy); pthread_mutex_unlock(&LOCK_prepare_ordered); } @@ -5940,9 +5917,9 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, struct TC_group_commit_entry entry; bool is_group_commit_leader; + thd->clear_wakeup_ready(); entry.thd= thd; entry.all= all; - entry.group_commit_ready= false; entry.xid_error= 0; pthread_mutex_lock(&LOCK_prepare_ordered); @@ -6019,7 +5996,7 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, */ TC_group_commit_entry *next= current->next; if (current != &entry) // Don't wake up ourself - group_commit_wakeup_other(current); + current->thd->signal_wakeup_ready(); current= next; } while (current != NULL); DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered"); @@ -6029,7 +6006,7 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, else { /* If not leader, just wait until leader wakes us up. */ - group_commit_wait_for_wakeup(&entry); + thd->wait_for_wakeup_ready(); } /* @@ -6181,6 +6158,7 @@ int TC_LOG_MMAP::open(const char *opt_name) pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST); pthread_cond_init(&COND_active, 0); pthread_cond_init(&COND_pool, 0); + pthread_cond_init(&COND_queue_busy, 0); inited=6; @@ -6188,6 +6166,8 @@ int TC_LOG_MMAP::open(const char *opt_name) active=pages; pool=pages+1; pool_last=pages+npages-1; + commit_ordered_queue= NULL; + commit_ordered_queue_busy= false; return 0; @@ -6293,7 +6273,7 @@ int TC_LOG_MMAP::overflow() to the position in memory where xid was logged to. */ -int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid) +int TC_LOG_MMAP::log_one_transaction(my_xid xid) { int err; PAGE *p; @@ -6462,6 +6442,8 @@ void TC_LOG_MMAP::close() pthread_mutex_destroy(&LOCK_active); pthread_mutex_destroy(&LOCK_pool); pthread_cond_destroy(&COND_pool); + pthread_cond_destroy(&COND_active); + pthread_cond_destroy(&COND_queue_busy); case 5: data[0]='A'; // garble the first (signature) byte, in case my_delete fails case 4: diff --git a/sql/log.h b/sql/log.h index 863cb188866..37f5462f198 100644 --- a/sql/log.h +++ b/sql/log.h @@ -91,11 +91,6 @@ protected: THD *thd; /* This is the `all' parameter for ha_commit_trans() etc. */ bool all; - /* - Flag set true when it is time for this thread to wake up after group - commit. Used with THD::LOCK_commit_ordered and THD::COND_commit_ordered. - */ - bool group_commit_ready; /* Set by TC_LOG_group_commit::group_log_xid(), to return per-thd error and cookie. @@ -105,9 +100,6 @@ protected: TC_group_commit_entry * reverse_queue(TC_group_commit_entry *queue); - void group_commit_wait_for_wakeup(TC_group_commit_entry *entry); - void group_commit_wakeup_other(TC_group_commit_entry *other); - /* This is a queue of threads waiting for being allowed to commit. Access to the queue must be protected by LOCK_prepare_ordered. @@ -115,36 +107,6 @@ protected: TC_group_commit_entry *group_commit_queue; }; -class TC_LOG_unordered: public TC_LOG_queued -{ -public: - TC_LOG_unordered(); - ~TC_LOG_unordered(); - - int log_and_order(THD *thd, my_xid xid, bool all, - bool need_prepare_ordered, bool need_commit_ordered); - -protected: - virtual int log_xid(THD *thd, my_xid xid)=0; - -private: - /* - This flag and condition is used to reserve the queue while threads in it - each run the commit_ordered() methods one after the other. Only once the - last commit_ordered() in the queue is done can we start on a new queue - run. - - Since we start this process in the first thread in the queue and finish in - the last (and possibly different) thread, we need a condition variable for - this (we cannot unlock a mutex in a different thread than the one who - locked it). - - The condition is used together with the LOCK_prepare_ordered mutex. - */ - my_bool group_commit_queue_busy; - pthread_cond_t COND_queue_busy; -}; - class TC_LOG_group_commit: public TC_LOG_queued { public: @@ -206,18 +168,28 @@ private: pthread_mutex_t LOCK_group_commit; }; -class TC_LOG_DUMMY: public TC_LOG_unordered // use it to disable the logging +class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging { public: TC_LOG_DUMMY() {} int open(const char *opt_name) { return 0; } void close() { } - int log_xid(THD *thd, my_xid xid) { return 1; } + /* + TC_LOG_DUMMY is only used when there are <= 1 XA-capable engines, and we + only use internal XA during commit when >= 2 XA-capable engines + participate. + */ + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered) + { + DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */); + return 1; + } void unlog(ulong cookie, my_xid xid) { } }; #ifdef HAVE_MMAP -class TC_LOG_MMAP: public TC_LOG_unordered +class TC_LOG_MMAP: public TC_LOG { public: // only to keep Sun Forte on sol9x86 happy typedef enum { @@ -238,6 +210,13 @@ class TC_LOG_MMAP: public TC_LOG_unordered pthread_cond_t cond; // to wait for a sync } PAGE; + /* List of THDs for which to invoke commit_ordered(), in order. */ + struct commit_entry + { + struct commit_entry *next; + THD *thd; + }; + char logname[FN_REFLEN]; File fd; my_off_t file_length; @@ -252,16 +231,38 @@ class TC_LOG_MMAP: public TC_LOG_unordered */ pthread_mutex_t LOCK_active, LOCK_pool, LOCK_sync; pthread_cond_t COND_pool, COND_active; + /* + Queue of threads that need to call commit_ordered(). + Access to this queue must be protected by LOCK_prepare_ordered. + */ + commit_entry *commit_ordered_queue; + /* + This flag and condition is used to reserve the queue while threads in it + each run the commit_ordered() methods one after the other. Only once the + last commit_ordered() in the queue is done can we start on a new queue + run. + + Since we start this process in the first thread in the queue and finish in + the last (and possibly different) thread, we need a condition variable for + this (we cannot unlock a mutex in a different thread than the one who + locked it). + + The condition is used together with the LOCK_prepare_ordered mutex. + */ + my_bool commit_ordered_queue_busy; + pthread_cond_t COND_queue_busy; public: TC_LOG_MMAP(): inited(0) {} int open(const char *opt_name); void close(); - int log_xid(THD *thd, my_xid xid); + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered); void unlog(ulong cookie, my_xid xid); int recover(); private: + int log_one_transaction(my_xid xid); void get_active_from_pool(); int sync(); int overflow(); diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 7c8ed46347f..563617f0a5b 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -704,8 +704,8 @@ THD::THD() active_vio = 0; #endif pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_FAST); - pthread_cond_init(&COND_commit_ordered, 0); + pthread_mutex_init(&LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_wakeup_ready, 0); /* Variables with default values */ proc_info="login"; @@ -1037,8 +1037,8 @@ THD::~THD() free_root(&transaction.mem_root,MYF(0)); #endif mysys_var=0; // Safety (shouldn't be needed) - pthread_cond_destroy(&COND_commit_ordered); - pthread_mutex_destroy(&LOCK_commit_ordered); + pthread_cond_destroy(&COND_wakeup_ready); + pthread_mutex_destroy(&LOCK_wakeup_ready); pthread_mutex_destroy(&LOCK_thd_data); #ifndef DBUG_OFF dbug_sentry= THD_SENTRY_GONE; @@ -4009,6 +4009,25 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_RETURN(0); } +void +THD::wait_for_wakeup_ready() +{ + pthread_mutex_lock(&LOCK_wakeup_ready); + while (!wakeup_ready) + pthread_cond_wait(&COND_wakeup_ready, &LOCK_wakeup_ready); + pthread_mutex_unlock(&LOCK_wakeup_ready); +} + +void +THD::signal_wakeup_ready() +{ + pthread_mutex_lock(&LOCK_wakeup_ready); + wakeup_ready= true; + pthread_cond_signal(&COND_wakeup_ready); + pthread_mutex_unlock(&LOCK_wakeup_ready); +} + + bool Discrete_intervals_list::append(ulonglong start, ulonglong val, ulonglong incr) { diff --git a/sql/sql_class.h b/sql/sql_class.h index ed02504e3ab..618d6a6e089 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1447,10 +1447,6 @@ public: /* container for handler's private per-connection data */ Ha_data ha_data[MAX_HA]; - /* Mutex and condition for waking up threads after group commit. */ - pthread_mutex_t LOCK_commit_ordered; - pthread_cond_t COND_commit_ordered; - #ifndef MYSQL_CLIENT int binlog_setup_trx_data(); @@ -2380,6 +2376,14 @@ public: LEX_STRING get_invoker_user() { return invoker_user; } LEX_STRING get_invoker_host() { return invoker_host; } bool has_invoker() { return invoker_user.length > 0; } + void clear_wakeup_ready() { wakeup_ready= false; } + /* + Sleep waiting for others to wake us up with signal_wakeup_ready(). + Must call clear_wakeup_ready() before waiting. + */ + void wait_for_wakeup_ready(); + /* Wake this thread up from wait_for_wakeup_ready(). */ + void signal_wakeup_ready(); private: /** The current internal error handler for this thread, or NULL. */ Internal_error_handler *m_internal_handler; @@ -2418,6 +2422,16 @@ private: */ LEX_STRING invoker_user; LEX_STRING invoker_host; + /* + Flag, mutex and condition for a thread to wait for a signal from another + thread. + + Currently used to wait for group commit to complete, can also be used for + other purposes. + */ + bool wakeup_ready; + pthread_mutex_t LOCK_wakeup_ready; + pthread_cond_t COND_wakeup_ready; };