mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-30 04:26:45 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1686 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1686 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2012 Monty Program Ab
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 | |
| 
 | |
| #include <my_global.h>
 | |
| #include <violite.h>
 | |
| #include <sql_priv.h>
 | |
| #include <sql_class.h>
 | |
| #include <my_pthread.h>
 | |
| #include <scheduler.h>
 | |
| 
 | |
| #ifdef HAVE_POOL_OF_THREADS
 | |
| 
 | |
| #include <sql_connect.h>
 | |
| #include <mysqld.h>
 | |
| #include <debug_sync.h>
 | |
| #include <time.h>
 | |
| #include <sql_plist.h>
 | |
| #include <threadpool.h>
 | |
| #include <time.h>
 | |
| #ifdef __linux__
 | |
| #include <sys/epoll.h>
 | |
| typedef struct epoll_event native_event;
 | |
| #elif defined(HAVE_KQUEUE)
 | |
| #include <sys/event.h>
 | |
| typedef struct kevent native_event;
 | |
| #elif defined (__sun)
 | |
| #include <port.h>
 | |
| typedef port_event_t native_event;
 | |
| #else
 | |
| #error threadpool is not available on this platform
 | |
| #endif
 | |
| 
 | |
| /** Maximum number of native events a listener can read in one go */
 | |
| #define MAX_EVENTS 1024
 | |
| 
 | |
| /** Indicates that threadpool was initialized*/
 | |
| static bool threadpool_started= false; 
 | |
| 
 | |
| /* 
 | |
|   Define PSI Keys for performance schema. 
 | |
|   We have a mutex per group, worker threads, condition per worker thread, 
 | |
|   and timer thread  with its own mutex and condition.
 | |
| */
 | |
|  
 | |
|  
 | |
| #ifdef HAVE_PSI_INTERFACE
 | |
| static PSI_mutex_key key_group_mutex;
 | |
| static PSI_mutex_key key_timer_mutex;
 | |
| static PSI_mutex_info mutex_list[]=
 | |
| {
 | |
|   { &key_group_mutex, "group_mutex", 0},
 | |
|   { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
 | |
| };
 | |
| 
 | |
| static PSI_cond_key key_worker_cond;
 | |
| static PSI_cond_key key_timer_cond;
 | |
| static PSI_cond_info cond_list[]=
 | |
| {
 | |
|   { &key_worker_cond, "worker_cond", 0},
 | |
|   { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
 | |
| };
 | |
| 
 | |
| static PSI_thread_key key_worker_thread;
 | |
| static PSI_thread_key key_timer_thread;
 | |
| static PSI_thread_info	thread_list[] =
 | |
| {
 | |
|  {&key_worker_thread, "worker_thread", 0},
 | |
|  {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
 | |
| };
 | |
| 
 | |
| /* Macro to simplify performance schema registration */ 
 | |
| #define PSI_register(X) \
 | |
|  if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
 | |
| #else
 | |
| #define PSI_register(X) /* no-op */
 | |
| #endif
 | |
| 
 | |
| 
 | |
| struct thread_group_t;
 | |
| 
 | |
| /* Per-thread structure for workers */
 | |
| struct worker_thread_t
 | |
| {
 | |
|   ulonglong  event_count; /* number of request handled by this thread */
 | |
|   thread_group_t* thread_group;   
 | |
|   worker_thread_t *next_in_list;
 | |
|   worker_thread_t **prev_in_list;
 | |
|   
 | |
|   mysql_cond_t  cond;
 | |
|   bool          woken;
 | |
| };
 | |
| 
 | |
| typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
 | |
|                  &worker_thread_t::next_in_list,
 | |
|                  &worker_thread_t::prev_in_list> 
 | |
|                  >
 | |
| worker_list_t;
 | |
| 
 | |
| struct connection_t
 | |
| {
 | |
| 
 | |
|   THD *thd;
 | |
|   thread_group_t *thread_group;
 | |
|   connection_t *next_in_queue;
 | |
|   connection_t **prev_in_queue;
 | |
|   ulonglong abs_wait_timeout;
 | |
|   bool logged_in;
 | |
|   bool bound_to_poll_descriptor;
 | |
|   bool waiting;
 | |
| };
 | |
| 
 | |
| typedef I_P_List<connection_t,
 | |
|                      I_P_List_adapter<connection_t,
 | |
|                                       &connection_t::next_in_queue,
 | |
|                                       &connection_t::prev_in_queue>,
 | |
|                      I_P_List_null_counter,
 | |
|                      I_P_List_fast_push_back<connection_t> >
 | |
| connection_queue_t;
 | |
| 
 | |
| struct thread_group_t 
 | |
| {
 | |
|   mysql_mutex_t mutex;
 | |
|   connection_queue_t queue;
 | |
|   worker_list_t waiting_threads; 
 | |
|   worker_thread_t *listener;
 | |
|   pthread_attr_t *pthread_attr;
 | |
|   int  pollfd;
 | |
|   int  thread_count;
 | |
|   int  active_thread_count;
 | |
|   int  connection_count;
 | |
|   /* Stats for the deadlock detection timer routine.*/
 | |
|   int io_event_count;
 | |
|   int queue_event_count;
 | |
|   ulonglong last_thread_creation_time;
 | |
|   int  shutdown_pipe[2];
 | |
|   bool shutdown;
 | |
|   bool stalled;
 | |
|   
 | |
| } MY_ALIGNED(512);
 | |
| 
 | |
| static thread_group_t *all_groups;
 | |
| static uint group_count;
 | |
| static int32 shutdown_group_count;
 | |
| 
 | |
| /**
 | |
|  Used for printing "pool blocked" message, see
 | |
|  print_pool_blocked_message();
 | |
| */
 | |
| static ulonglong pool_block_start;
 | |
| 
 | |
| /* Global timer for all groups  */
 | |
| struct pool_timer_t
 | |
| {
 | |
|   mysql_mutex_t mutex;
 | |
|   mysql_cond_t cond;
 | |
|   volatile uint64 current_microtime;
 | |
|   volatile uint64 next_timeout_check;
 | |
|   int  tick_interval;
 | |
|   bool shutdown;
 | |
| };
 | |
| 
 | |
| static pool_timer_t pool_timer;
 | |
| 
 | |
| static void queue_put(thread_group_t *thread_group, connection_t *connection);
 | |
| static int  wake_thread(thread_group_t *thread_group);
 | |
| static void handle_event(connection_t *connection);
 | |
| static int  wake_or_create_thread(thread_group_t *thread_group);
 | |
| static int  create_worker(thread_group_t *thread_group);
 | |
| static void *worker_main(void *param);
 | |
| static void check_stall(thread_group_t *thread_group);
 | |
| static void connection_abort(connection_t *connection);
 | |
| static void set_wait_timeout(connection_t *connection);
 | |
| static void set_next_timeout_check(ulonglong abstime);
 | |
| static void print_pool_blocked_message(bool);
 | |
| 
 | |
| /**
 | |
|  Asynchronous network IO.
 | |
|  
 | |
|  We use native edge-triggered network IO multiplexing facility. 
 | |
|  This maps to different APIs on different Unixes.
 | |
|  
 | |
|  Supported are currently Linux with epoll, Solaris with event ports,
 | |
|  OSX and BSD with kevent. All those API's are used with one-shot flags
 | |
|  (the event is signalled once client has written something into the socket, 
 | |
|  then socket is removed from the "poll-set" until the  command is finished,
 | |
|  and we need to re-arm/re-register socket)
 | |
|  
 | |
|  No implementation for poll/select/AIO is currently provided.
 | |
|  
 | |
|  The API closely resembles all of the above mentioned platform APIs 
 | |
|  and consists of following functions. 
 | |
|  
 | |
|  - io_poll_create()
 | |
|  Creates an io_poll descriptor 
 | |
|  On Linux: epoll_create()
 | |
|  
 | |
|  - io_poll_associate_fd(int poll_fd, int fd, void *data)
 | |
|  Associate file descriptor with io poll descriptor 
 | |
|  On Linux : epoll_ctl(..EPOLL_CTL_ADD))
 | |
|  
 | |
|  - io_poll_disassociate_fd(int pollfd, int fd)
 | |
|   Associate file descriptor with io poll descriptor 
 | |
|   On Linux: epoll_ctl(..EPOLL_CTL_DEL)
 | |
|  
 | |
|  
 | |
|  - io_poll_start_read(int poll_fd,int fd, void *data)
 | |
|  The same as io_poll_associate_fd(), but cannot be used before 
 | |
|  io_poll_associate_fd() was called.
 | |
|  On Linux : epoll_ctl(..EPOLL_CTL_MOD)
 | |
|  
 | |
|  - io_poll_wait (int pollfd, native_event *native_events, int maxevents, 
 | |
|    int timeout_ms)
 | |
|  
 | |
|  wait until one or more descriptors added with io_poll_associate_fd() 
 | |
|  or io_poll_start_read() becomes readable. Data associated with 
 | |
|  descriptors can be retrieved from native_events array, using 
 | |
|  native_event_get_userdata() function.
 | |
| 
 | |
|  
 | |
|  On Linux: epoll_wait()
 | |
| */
 | |
| 
 | |
| #if defined (__linux__)
 | |
| #ifndef EPOLLRDHUP
 | |
| /* Early 2.6 kernel did not have EPOLLRDHUP */
 | |
| #define EPOLLRDHUP 0
 | |
| #endif
 | |
| static int io_poll_create()
 | |
| {
 | |
|   return epoll_create(1);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_associate_fd(int pollfd, int fd, void *data)
 | |
| {
 | |
|   struct epoll_event ev;
 | |
|   ev.data.u64= 0; /* Keep valgrind happy */
 | |
|   ev.data.ptr= data;
 | |
|   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
 | |
|   return epoll_ctl(pollfd, EPOLL_CTL_ADD,  fd, &ev);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| int io_poll_start_read(int pollfd, int fd, void *data)
 | |
| {
 | |
|   struct epoll_event ev;
 | |
|   ev.data.u64= 0; /* Keep valgrind happy */
 | |
|   ev.data.ptr= data;
 | |
|   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
 | |
|   return epoll_ctl(pollfd, EPOLL_CTL_MOD,  fd, &ev); 
 | |
| }
 | |
| 
 | |
| int io_poll_disassociate_fd(int pollfd, int fd)
 | |
| {
 | |
|   struct epoll_event ev;
 | |
|   return epoll_ctl(pollfd, EPOLL_CTL_DEL,  fd, &ev);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|  Wrapper around epoll_wait.
 | |
|  NOTE - in case of EINTR, it restarts with original timeout. Since we use
 | |
|  either infinite or 0 timeouts, this is not critical
 | |
| */
 | |
| int io_poll_wait(int pollfd, native_event *native_events, int maxevents, 
 | |
|               int timeout_ms)
 | |
| {
 | |
|   int ret;
 | |
|   do 
 | |
|   {
 | |
|     ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
 | |
|   }
 | |
|   while(ret == -1 && errno == EINTR);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void *native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return event->data.ptr;
 | |
| }
 | |
| 
 | |
| #elif defined(HAVE_KQUEUE)
 | |
| 
 | |
| /* 
 | |
|   NetBSD is incompatible with other BSDs , last parameter in EV_SET macro
 | |
|   (udata, user data) needs to be intptr_t, whereas it needs to be void* 
 | |
|   everywhere else.
 | |
| */
 | |
| 
 | |
| #ifdef __NetBSD__
 | |
| #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g)
 | |
| #else
 | |
| #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g)
 | |
| #endif
 | |
| 
 | |
| 
 | |
| int io_poll_create()
 | |
| {
 | |
|   return kqueue();
 | |
| }
 | |
| 
 | |
| int io_poll_start_read(int pollfd, int fd, void *data)
 | |
| {
 | |
|   struct kevent ke;
 | |
|   MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 
 | |
|          0, 0, data);
 | |
|   return kevent(pollfd, &ke, 1, 0, 0, 0); 
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_associate_fd(int pollfd, int fd, void *data)
 | |
| {
 | |
|   struct kevent ke;
 | |
|   MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 
 | |
|          0, 0, data);
 | |
|   return io_poll_start_read(pollfd,fd, data); 
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_disassociate_fd(int pollfd, int fd)
 | |
| {
 | |
|   struct kevent ke;
 | |
|   MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
 | |
|   return kevent(pollfd, &ke, 1, 0, 0, 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms)
 | |
| {
 | |
|   struct timespec ts;
 | |
|   int ret;
 | |
|   if (timeout_ms >= 0)
 | |
|   {
 | |
|     ts.tv_sec= timeout_ms/1000;
 | |
|     ts.tv_nsec= (timeout_ms%1000)*1000000;
 | |
|   }
 | |
|   do
 | |
|   {
 | |
|     ret= kevent(pollfd, 0, 0, events, maxevents, 
 | |
|                (timeout_ms >= 0)?&ts:NULL);
 | |
|   }
 | |
|   while (ret == -1 && errno == EINTR);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| static void* native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return (void *)event->udata;
 | |
| }
 | |
| 
 | |
| #elif defined (__sun)
 | |
| 
 | |
| static int io_poll_create()
 | |
| {
 | |
|   return port_create();
 | |
| }
 | |
| 
 | |
| int io_poll_start_read(int pollfd, int fd, void *data)
 | |
| {
 | |
|   return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
 | |
| }
 | |
| 
 | |
| static int io_poll_associate_fd(int pollfd, int fd, void *data)
 | |
| {
 | |
|   return io_poll_start_read(pollfd, fd, data);
 | |
| }
 | |
| 
 | |
| int io_poll_disassociate_fd(int pollfd, int fd)
 | |
| {
 | |
|   return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
 | |
| }
 | |
| 
 | |
| int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
 | |
| {
 | |
|   struct timespec ts;
 | |
|   int ret;
 | |
|   uint_t nget= 1;
 | |
|   if (timeout_ms >= 0)
 | |
|   {
 | |
|     ts.tv_sec= timeout_ms/1000;
 | |
|     ts.tv_nsec= (timeout_ms%1000)*1000000;
 | |
|   }
 | |
|   do
 | |
|   {
 | |
|     ret= port_getn(pollfd, events, maxevents, &nget,
 | |
|             (timeout_ms >= 0)?&ts:NULL);
 | |
|   }
 | |
|   while (ret == -1 && errno == EINTR);
 | |
|   DBUG_ASSERT(nget < INT_MAX);
 | |
|   return (int)nget;
 | |
| }
 | |
| 
 | |
| static void* native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return event->portev_user;
 | |
| }
 | |
| #endif
 | |
| 
 | |
| 
 | |
| /* Dequeue element from a workqueue */
 | |
| 
 | |
| static connection_t *queue_get(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("queue_get");
 | |
|   thread_group->queue_event_count++;
 | |
|   connection_t *c= thread_group->queue.front();
 | |
|   if (c)
 | |
|   {
 | |
|     thread_group->queue.remove(c);
 | |
|   }
 | |
|   DBUG_RETURN(c);  
 | |
| }
 | |
| 
 | |
| 
 | |
| /* 
 | |
|   Handle wait timeout : 
 | |
|   Find connections that have been idle for too long and kill them.
 | |
|   Also, recalculate time when next timeout check should run.
 | |
| */
 | |
| 
 | |
| static void timeout_check(pool_timer_t *timer)
 | |
| {
 | |
|   DBUG_ENTER("timeout_check");
 | |
|   
 | |
|   mysql_mutex_lock(&LOCK_thread_count);
 | |
|   I_List_iterator<THD> it(threads);
 | |
| 
 | |
|   /* Reset next timeout check, it will be recalculated in the loop below */
 | |
|   my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
 | |
| 
 | |
|   THD *thd;
 | |
|   while ((thd=it++))
 | |
|   {
 | |
|     if (thd->net.reading_or_writing != 1)
 | |
|       continue;
 | |
|  
 | |
|     connection_t *connection= (connection_t *)thd->event_scheduler.data;
 | |
|     if (!connection)
 | |
|     {
 | |
|       /* 
 | |
|         Connection does not have scheduler data. This happens for example
 | |
|         if THD belongs to a different scheduler, that is listening to extra_port.
 | |
|       */
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     if(connection->abs_wait_timeout < timer->current_microtime)
 | |
|     {
 | |
|       /* Wait timeout exceeded, kill connection. */
 | |
|       mysql_mutex_lock(&thd->LOCK_thd_data);
 | |
|       thd->killed = KILL_CONNECTION;
 | |
|       post_kill_notification(thd);
 | |
|       mysql_mutex_unlock(&thd->LOCK_thd_data);
 | |
|     }
 | |
|     else 
 | |
|     {
 | |
|       set_next_timeout_check(connection->abs_wait_timeout);
 | |
|     }
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_thread_count);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* 
 | |
|  Timer thread. 
 | |
|  
 | |
|   Periodically, check if one of the thread groups is stalled. Stalls happen if
 | |
|   events are not being dequeued from the queue, or from the network, Primary  
 | |
|   reason for stall can be a lengthy executing non-blocking request. It could 
 | |
|   also happen that thread is waiting but wait_begin/wait_end is forgotten by 
 | |
|   storage engine. Timer thread will create a new thread in group in case of 
 | |
|   a stall.
 | |
|  
 | |
|   Besides checking for stalls, timer thread is also responsible for terminating
 | |
|   clients that have been idle for longer than wait_timeout seconds.
 | |
| 
 | |
|   TODO: Let the timer sleep for long time if there is no work to be done.
 | |
|   Currently it wakes up rather often on and idle server.
 | |
| */
 | |
| 
 | |
| static void* timer_thread(void *param)
 | |
| {
 | |
|   uint i;
 | |
|   pool_timer_t* timer=(pool_timer_t *)param;
 | |
| 
 | |
|   my_thread_init();
 | |
|   DBUG_ENTER("timer_thread");
 | |
|   timer->next_timeout_check= ULONGLONG_MAX;
 | |
|   timer->current_microtime= microsecond_interval_timer();
 | |
| 
 | |
|   for(;;)
 | |
|   {
 | |
|     struct timespec ts;
 | |
|     int err;
 | |
| 
 | |
|     set_timespec_nsec(ts,timer->tick_interval*1000000);
 | |
|     mysql_mutex_lock(&timer->mutex);
 | |
|     err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
 | |
|     if (timer->shutdown)
 | |
|     {
 | |
|       mysql_mutex_unlock(&timer->mutex);
 | |
|       break;
 | |
|     }
 | |
|     if (err == ETIMEDOUT)
 | |
|     {
 | |
|       timer->current_microtime= microsecond_interval_timer();
 | |
|       
 | |
|       /* Check stalls in thread groups */
 | |
|       for (i= 0; i < threadpool_max_size; i++)
 | |
|       {
 | |
|         if(all_groups[i].connection_count)
 | |
|            check_stall(&all_groups[i]);
 | |
|       }
 | |
|       
 | |
|       /* Check if any client exceeded wait_timeout */
 | |
|       if (timer->next_timeout_check <= timer->current_microtime)
 | |
|         timeout_check(timer);
 | |
|     }
 | |
|     mysql_mutex_unlock(&timer->mutex);
 | |
|   }
 | |
| 
 | |
|   mysql_mutex_destroy(&timer->mutex);
 | |
|   my_thread_end();
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| void check_stall(thread_group_t *thread_group)
 | |
| {
 | |
|   if (mysql_mutex_trylock(&thread_group->mutex) != 0)
 | |
|   {
 | |
|     /* Something happens. Don't disturb */
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Check if listener is present. If not,  check whether any IO 
 | |
|     events were dequeued since last time. If not, this means 
 | |
|     listener is either in tight loop or thd_wait_begin() 
 | |
|     was forgotten. Create a new worker(it will make itself listener).
 | |
|   */
 | |
|   if (!thread_group->listener && !thread_group->io_event_count)
 | |
|   {
 | |
|     wake_or_create_thread(thread_group);
 | |
|     mysql_mutex_unlock(&thread_group->mutex);
 | |
|     return;
 | |
|   }
 | |
|   
 | |
|   /*  Reset io event count */
 | |
|   thread_group->io_event_count= 0;
 | |
| 
 | |
|   /* 
 | |
|     Check whether requests from the workqueue are being dequeued.
 | |
| 
 | |
|     The stall detection and resolution works as follows:
 | |
| 
 | |
|     1. There is a counter thread_group->queue_event_count for the number of 
 | |
|        events removed from the queue. Timer resets the counter to 0 on each run.
 | |
|     2. Timer determines stall if this counter remains 0 since last check
 | |
|        and the queue is not empty.
 | |
|     3. Once timer determined a stall it sets thread_group->stalled flag and
 | |
|        wakes and idle worker (or creates a new one, subject to throttling).
 | |
|     4. The stalled flag is reset, when an event is dequeued.
 | |
| 
 | |
|     Q : Will this handling lead to an unbound growth of threads, if queue
 | |
|     stalls permanently?
 | |
|     A : No. If queue stalls permanently, it is an indication for many very long
 | |
|     simultaneous queries. The maximum number of simultanoues queries is 
 | |
|     max_connections, further we have threadpool_max_threads limit, upon which no
 | |
|     worker threads are created. So in case there is a flood of very long 
 | |
|     queries, threadpool would slowly approach thread-per-connection behavior.
 | |
|     NOTE:
 | |
|     If long queries never wait, creation of the new threads is done by timer,
 | |
|     so it is slower than in real thread-per-connection. However if long queries 
 | |
|     do wait and indicate that via thd_wait_begin/end callbacks, thread creation
 | |
|     will be faster.
 | |
|   */
 | |
|   if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
 | |
|   {
 | |
|     thread_group->stalled= true;
 | |
|     wake_or_create_thread(thread_group);
 | |
|   }
 | |
|   
 | |
|   /* Reset queue event count */
 | |
|   thread_group->queue_event_count= 0;
 | |
|   
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| }
 | |
| 
 | |
| 
 | |
| static void start_timer(pool_timer_t* timer)
 | |
| {
 | |
|   pthread_t thread_id;
 | |
|   DBUG_ENTER("start_timer");
 | |
|   mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
 | |
|   mysql_cond_init(key_timer_cond, &timer->cond, NULL);
 | |
|   timer->shutdown = false;
 | |
|   mysql_thread_create(key_timer_thread,&thread_id, NULL, timer_thread, timer);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void stop_timer(pool_timer_t *timer)
 | |
| {
 | |
|   DBUG_ENTER("stop_timer");
 | |
|   mysql_mutex_lock(&timer->mutex);
 | |
|   timer->shutdown = true;
 | |
|   mysql_cond_signal(&timer->cond);
 | |
|   mysql_mutex_unlock(&timer->mutex);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Poll for socket events and distribute them to worker threads
 | |
|   In many case current thread will handle single event itself.
 | |
|   
 | |
|   @return a ready connection, or NULL on shutdown
 | |
| */
 | |
| static connection_t * listener(worker_thread_t *current_thread, 
 | |
|                                thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("listener");
 | |
|   connection_t *retval= NULL;
 | |
| 
 | |
|   for(;;)
 | |
|   {
 | |
|     native_event ev[MAX_EVENTS];
 | |
|     int cnt;
 | |
|     
 | |
|     if (thread_group->shutdown)
 | |
|       break;
 | |
|   
 | |
|     cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
 | |
|     
 | |
|     if (cnt <=0)
 | |
|     {
 | |
|       DBUG_ASSERT(thread_group->shutdown);
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     mysql_mutex_lock(&thread_group->mutex);
 | |
| 
 | |
|     if (thread_group->shutdown)
 | |
|     {
 | |
|       mysql_mutex_unlock(&thread_group->mutex);
 | |
|       break;
 | |
|     }
 | |
|     
 | |
|     thread_group->io_event_count += cnt;  
 | |
|     
 | |
|     /* 
 | |
|      We got some network events and need to make decisions : whether
 | |
|      listener  hould handle events and whether or not any wake worker
 | |
|      threads so they can handle events.
 | |
|      
 | |
|      Q1 : Should listener handle an event itself, or put all events into 
 | |
|      queue  and let workers handle the events?
 | |
|      
 | |
|      Solution :
 | |
|      Generally, listener that handles events itself is preferable. We do not 
 | |
|      want listener thread to change its state from waiting  to running too 
 | |
|      often, Since listener has just woken from poll, it better uses its time
 | |
|      slice and does some work. Besides, not handling events means they go to
 | |
|      the  queue, and often to wake another worker must wake up to handle the
 | |
|      event. This is not good, as we want to avoid wakeups.
 | |
|      
 | |
|      The downside of listener that also handles queries is that we can
 | |
|      potentially leave thread group  for long time not picking the new 
 | |
|      network events. It is not  a major problem, because this stall will be
 | |
|      detected  sooner or later by  the timer thread. Still, relying on timer
 | |
|      is not always good, because it may "tick" too slow (large timer_interval)
 | |
|      
 | |
|      We use following strategy to solve this problem - if queue was not empty
 | |
|      we suspect flood of network events and listener stays, Otherwise, it 
 | |
|      handles a query.
 | |
|      
 | |
|      
 | |
|      Q2: If queue is not empty, how many workers to wake?
 | |
|      
 | |
|      Solution:
 | |
|      We generally try to keep one thread per group active (threads handling 
 | |
|      queries   are considered active, unless they stuck in inside some "wait")
 | |
|      Thus, we will wake only one worker, and only if there is not active 
 | |
|      threads currently,and listener is not going to handle a query. When we 
 | |
|      don't wake, we hope that  currently active  threads will finish fast and 
 | |
|      handle the queue. If this does  not happen, timer thread will detect stall
 | |
|      and wake a worker.
 | |
|      
 | |
|      NOTE: Currently nothing is done to detect or prevent long queuing times. 
 | |
|      A solutionc for the future would be to give up "one active thread per 
 | |
|      group" principle, if events stay  in the queue for too long, and just wake 
 | |
|      more workers.
 | |
|     */
 | |
|     
 | |
|     bool listener_picks_event= thread_group->queue.is_empty();
 | |
|     
 | |
|     /* 
 | |
|       If listener_picks_event is set, listener thread will handle first event, 
 | |
|       and put the rest into the queue. If listener_pick_event is not set, all 
 | |
|       events go to the queue.
 | |
|     */
 | |
|     for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
 | |
|     {
 | |
|       connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
 | |
|       thread_group->queue.push_back(c);
 | |
|     }
 | |
|     
 | |
|     if (listener_picks_event)
 | |
|     {
 | |
|       /* Handle the first event. */
 | |
|       retval= (connection_t *)native_event_get_userdata(&ev[0]);
 | |
|       mysql_mutex_unlock(&thread_group->mutex);
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if(thread_group->active_thread_count==0)
 | |
|     {
 | |
|       /* We added some work items to queue, now wake a worker. */
 | |
|       if(wake_thread(thread_group))
 | |
|       {
 | |
|         /* 
 | |
|           Wake failed, hence groups has no idle threads. Now check if there are
 | |
|           any threads in the group except listener.
 | |
|         */ 
 | |
|         if(thread_group->thread_count == 1)
 | |
|         {
 | |
|            /*
 | |
|              Currently there is no worker thread in the group, as indicated by
 | |
|              thread_count == 1 (this means listener is the only one thread in 
 | |
|              the group).
 | |
|              The queue is not empty, and listener is not going to handle
 | |
|              events. In order to drain the queue,  we create a worker here.
 | |
|              Alternatively, we could just rely on timer to detect stall, and 
 | |
|              create thread, but waiting for timer would be an inefficient and
 | |
|              pointless delay.
 | |
|            */
 | |
|            create_worker(thread_group);
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     mysql_mutex_unlock(&thread_group->mutex);
 | |
|   }
 | |
| 
 | |
|   DBUG_RETURN(retval);
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Adjust thread counters in group or global 
 | |
|   whenever thread is created or is about to exit
 | |
| 
 | |
|   @param thread_group
 | |
|   @param count -  1, when new thread is created
 | |
|                  -1, when thread is about to exit
 | |
| */
 | |
| 
 | |
| static void add_thread_count(thread_group_t *thread_group, int32 count)
 | |
| {
 | |
|   thread_group->thread_count += count;
 | |
|   /* worker starts out and end in "active" state */
 | |
|   thread_group->active_thread_count += count;
 | |
|   my_atomic_add32(&tp_stats.num_worker_threads, count);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Creates a new worker thread. 
 | |
|   thread_mutex must be held when calling this function 
 | |
| 
 | |
|   NOTE: in rare cases, the number of threads can exceed
 | |
|   threadpool_max_threads, because we need at least 2 threads
 | |
|   per group to prevent deadlocks (one listener + one worker)
 | |
| */
 | |
| 
 | |
| static int create_worker(thread_group_t *thread_group)
 | |
| {
 | |
|   pthread_t thread_id;
 | |
|   bool max_threads_reached= false;
 | |
|   int err;
 | |
|   
 | |
|   DBUG_ENTER("create_worker");
 | |
|   if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
 | |
|      && thread_group->thread_count >= 2)
 | |
|   {
 | |
|     err= 1;
 | |
|     max_threads_reached= true;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   
 | |
|   err= mysql_thread_create(key_worker_thread, &thread_id, 
 | |
|          thread_group->pthread_attr, worker_main, thread_group);
 | |
|   if (!err)
 | |
|   {
 | |
|     thread_group->last_thread_creation_time=microsecond_interval_timer();
 | |
|     thread_created++;
 | |
|     add_thread_count(thread_group, 1);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     my_errno= errno;
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   if (err)
 | |
|     print_pool_blocked_message(max_threads_reached);
 | |
|   else
 | |
|     pool_block_start= 0; /* Reset pool blocked timer, if it was set */
 | |
|     
 | |
|   DBUG_RETURN(err);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  Calculate microseconds throttling delay for thread creation.
 | |
|  
 | |
|  The value depends on how many threads are already in the group:
 | |
|  small number of threads means no delay, the more threads the larger
 | |
|  the delay.
 | |
|  
 | |
|  The actual values were not calculated using any scientific methods.
 | |
|  They just look right, and behave well in practice.
 | |
|  
 | |
|  TODO: Should throttling depend on thread_pool_stall_limit?
 | |
| */
 | |
| static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
 | |
| {
 | |
|   int count= thread_group->thread_count;
 | |
|   
 | |
|   if (count < 4)
 | |
|     return 0;
 | |
|   
 | |
|   if (count < 8)
 | |
|     return 50*1000; 
 | |
|   
 | |
|   if(count < 16)
 | |
|     return 100*1000;
 | |
|   
 | |
|   return 200*1000;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Wakes a worker thread, or creates a new one. 
 | |
|   
 | |
|   Worker creation is throttled, so we avoid too many threads
 | |
|   to be created during the short time.
 | |
| */
 | |
| static int wake_or_create_thread(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("wake_or_create_thread");
 | |
|   
 | |
|   if (thread_group->shutdown)
 | |
|    DBUG_RETURN(0);
 | |
| 
 | |
|   if (wake_thread(thread_group) == 0)
 | |
|     DBUG_RETURN(0);
 | |
| 
 | |
|   if (thread_group->thread_count > thread_group->connection_count)
 | |
|     DBUG_RETURN(-1);
 | |
| 
 | |
|  
 | |
|   if (thread_group->active_thread_count == 0)
 | |
|   {
 | |
|     /*
 | |
|      We're better off creating a new thread here  with no delay, either there 
 | |
|      are no workers at all, or they all are all blocking and there was no 
 | |
|      idle  thread to wakeup. Smells like a potential deadlock or very slowly 
 | |
|      executing requests, e.g sleeps or user locks.
 | |
|     */
 | |
|     DBUG_RETURN(create_worker(thread_group));
 | |
|   }
 | |
| 
 | |
|   ulonglong now = microsecond_interval_timer();
 | |
|   ulonglong time_since_last_thread_created =
 | |
|     (now - thread_group->last_thread_creation_time);
 | |
|   
 | |
|   /* Throttle thread creation. */  
 | |
|   if (time_since_last_thread_created >
 | |
|        microsecond_throttling_interval(thread_group))
 | |
|   {
 | |
|     DBUG_RETURN(create_worker(thread_group));
 | |
|   }
 | |
|   
 | |
|   DBUG_RETURN(-1);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
 | |
| {
 | |
|   DBUG_ENTER("thread_group_init");
 | |
|   thread_group->pthread_attr = thread_attr;
 | |
|   mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
 | |
|   thread_group->pollfd= -1;
 | |
|   thread_group->shutdown_pipe[0]= -1;
 | |
|   thread_group->shutdown_pipe[1]= -1;
 | |
|   thread_group->queue.empty();
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| void thread_group_destroy(thread_group_t *thread_group)
 | |
| {
 | |
|   mysql_mutex_destroy(&thread_group->mutex);
 | |
|   if (thread_group->pollfd != -1)
 | |
|   {
 | |
|     close(thread_group->pollfd);
 | |
|     thread_group->pollfd= -1;
 | |
|   }
 | |
|   for(int i=0; i < 2; i++)
 | |
|   {
 | |
|     if(thread_group->shutdown_pipe[i] != -1)
 | |
|     {
 | |
|       close(thread_group->shutdown_pipe[i]);
 | |
|       thread_group->shutdown_pipe[i]= -1;
 | |
|     }
 | |
|   }
 | |
|   if (my_atomic_add32(&shutdown_group_count, -1) == 1)
 | |
|     my_free(all_groups);
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Wake sleeping thread from waiting list
 | |
| */
 | |
| 
 | |
| static int wake_thread(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("wake_thread");
 | |
|   worker_thread_t *thread = thread_group->waiting_threads.front();
 | |
|   if(thread)
 | |
|   {
 | |
|     thread->woken= true;
 | |
|     thread_group->waiting_threads.remove(thread);
 | |
|     mysql_cond_signal(&thread->cond);
 | |
|     DBUG_RETURN(0);
 | |
|   }
 | |
|   DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Initiate shutdown for thread group.
 | |
| 
 | |
|   The shutdown is asynchronous, we only care to  wake all threads in here, so 
 | |
|   they can finish. We do not wait here until threads terminate. Final cleanup 
 | |
|   of the group (thread_group_destroy) will be done by the last exiting threads.
 | |
| */
 | |
| 
 | |
| static void thread_group_close(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("thread_group_close");
 | |
| 
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   if (thread_group->thread_count == 0) 
 | |
|   {
 | |
|     mysql_mutex_unlock(&thread_group->mutex);
 | |
|     thread_group_destroy(thread_group);
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
| 
 | |
|   thread_group->shutdown= true; 
 | |
|   thread_group->listener= NULL;
 | |
| 
 | |
|   if (pipe(thread_group->shutdown_pipe))
 | |
|   {
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
|   
 | |
|   /* Wake listener */
 | |
|   if (io_poll_associate_fd(thread_group->pollfd, 
 | |
|       thread_group->shutdown_pipe[0], NULL))
 | |
|   {
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
|   char c= 0;
 | |
|   if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
 | |
|     DBUG_VOID_RETURN;
 | |
| 
 | |
|   /* Wake all workers. */
 | |
|   while(wake_thread(thread_group) == 0) 
 | |
|   { 
 | |
|   }
 | |
|   
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* 
 | |
|   Add work to the queue. Maybe wake a worker if they all sleep.
 | |
|   
 | |
|   Currently, this function is only used when new connections need to
 | |
|   perform login (this is done in worker threads).
 | |
| 
 | |
| */
 | |
| 
 | |
| static void queue_put(thread_group_t *thread_group, connection_t *connection)
 | |
| {
 | |
|   DBUG_ENTER("queue_put");
 | |
| 
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   thread_group->queue.push_back(connection);
 | |
| 
 | |
|   if (thread_group->active_thread_count == 0)
 | |
|     wake_or_create_thread(thread_group);
 | |
| 
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* 
 | |
|   Prevent too many threads executing at the same time,if the workload is 
 | |
|   not CPU bound.
 | |
| */
 | |
| 
 | |
| static bool too_many_threads(thread_group_t *thread_group)
 | |
| {
 | |
|   return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe 
 | |
|    && !thread_group->stalled);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Retrieve a connection with pending event.
 | |
|   
 | |
|   Pending event in our case means that there is either a pending login request 
 | |
|   (if connection is not yet logged in), or there are unread bytes on the socket.
 | |
| 
 | |
|   If there are no pending events currently, thread will wait. 
 | |
|   If timeout specified in abstime parameter passes, the function returns NULL.
 | |
|  
 | |
|   @param current_thread - current worker thread
 | |
|   @param thread_group - current thread group
 | |
|   @param abstime - absolute wait timeout
 | |
|   
 | |
|   @return
 | |
|   connection with pending event. 
 | |
|   NULL is returned if timeout has expired,or on shutdown.
 | |
| */
 | |
| 
 | |
| connection_t *get_event(worker_thread_t *current_thread, 
 | |
|   thread_group_t *thread_group,  struct timespec *abstime)
 | |
| { 
 | |
|   DBUG_ENTER("get_event");
 | |
|   connection_t *connection = NULL;
 | |
|   int err=0;
 | |
| 
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   DBUG_ASSERT(thread_group->active_thread_count >= 0);
 | |
| 
 | |
|   for(;;) 
 | |
|   {
 | |
|     bool oversubscribed = too_many_threads(thread_group); 
 | |
|     if (thread_group->shutdown)
 | |
|      break;
 | |
| 
 | |
|     /* Check if queue is not empty */
 | |
|     if (!oversubscribed)
 | |
|     {
 | |
|       connection = queue_get(thread_group);
 | |
|       if(connection)
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     /* If there is  currently no listener in the group, become one. */
 | |
|     if(!thread_group->listener)
 | |
|     {
 | |
|       thread_group->listener= current_thread;
 | |
|       thread_group->active_thread_count--;
 | |
|       mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|       connection = listener(current_thread, thread_group);
 | |
| 
 | |
|       mysql_mutex_lock(&thread_group->mutex);
 | |
|       thread_group->active_thread_count++;
 | |
|       /* There is no listener anymore, it just returned. */
 | |
|       thread_group->listener= NULL;
 | |
|       break;
 | |
|     }
 | |
|     
 | |
|     /* 
 | |
|       Last thing we try before going to sleep is to 
 | |
|       pick a single event via epoll, without waiting (timeout 0)
 | |
|     */
 | |
|     if (!oversubscribed)
 | |
|     {
 | |
|       native_event nev;
 | |
|       if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
 | |
|       {
 | |
|         thread_group->io_event_count++;
 | |
|         connection = (connection_t *)native_event_get_userdata(&nev);
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     /* And now, finally sleep */ 
 | |
|     current_thread->woken = false; /* wake() sets this to true */
 | |
| 
 | |
|     /* 
 | |
|       Add current thread to the head of the waiting list  and wait.
 | |
|       It is important to add thread to the head rather than tail
 | |
|       as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
 | |
|     */
 | |
|     thread_group->waiting_threads.push_front(current_thread);
 | |
|     
 | |
|     thread_group->active_thread_count--;
 | |
|     if (abstime)
 | |
|     {
 | |
|       err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, 
 | |
|                                  abstime);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex);
 | |
|     }
 | |
|     thread_group->active_thread_count++;
 | |
|     
 | |
|     if (!current_thread->woken)
 | |
|     {
 | |
|       /*
 | |
|         Thread was not signalled by wake(), it might be a spurious wakeup or
 | |
|         a timeout. Anyhow, we need to remove ourselves from the list now.
 | |
|         If thread was explicitly woken, than caller removed us from the list.
 | |
|       */
 | |
|       thread_group->waiting_threads.remove(current_thread);
 | |
|     }
 | |
| 
 | |
|     if (err)
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   thread_group->stalled= false;
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
|  
 | |
|   DBUG_RETURN(connection);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Tells the pool that worker starts waiting  on IO, lock, condition, 
 | |
|   sleep() or similar.
 | |
| */
 | |
| 
 | |
| void wait_begin(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("wait_begin");
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   thread_group->active_thread_count--;
 | |
|   
 | |
|   DBUG_ASSERT(thread_group->active_thread_count >=0);
 | |
|   DBUG_ASSERT(thread_group->connection_count > 0);
 | |
|  
 | |
|   if ((thread_group->active_thread_count == 0) && 
 | |
|      (thread_group->queue.is_empty() || !thread_group->listener))
 | |
|   {
 | |
|     /* 
 | |
|       Group might stall while this thread waits, thus wake 
 | |
|       or create a worker to prevent stall.
 | |
|     */
 | |
|     wake_or_create_thread(thread_group);
 | |
|   }
 | |
|   
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Tells the pool has finished waiting.
 | |
| */
 | |
| 
 | |
| void wait_end(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("wait_end");
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   thread_group->active_thread_count++;
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Allocate/initialize a new connection structure.
 | |
| */
 | |
| 
 | |
| connection_t *alloc_connection(THD *thd)
 | |
| {
 | |
|   DBUG_ENTER("alloc_connection");
 | |
|   
 | |
|   connection_t* connection = (connection_t *)my_malloc(sizeof(connection_t),0);
 | |
|   if (connection)
 | |
|   {
 | |
|     connection->thd = thd;
 | |
|     connection->waiting= false;
 | |
|     connection->logged_in= false;
 | |
|     connection->bound_to_poll_descriptor= false;
 | |
|     connection->abs_wait_timeout= ULONGLONG_MAX;
 | |
|   }
 | |
|   DBUG_RETURN(connection);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Add a new connection to thread pool..
 | |
| */
 | |
| 
 | |
| void tp_add_connection(THD *thd)
 | |
| {
 | |
|   DBUG_ENTER("tp_add_connection");
 | |
|   
 | |
|   threads.append(thd);
 | |
|   mysql_mutex_unlock(&LOCK_thread_count);
 | |
|   connection_t *connection= alloc_connection(thd);
 | |
|   if (connection)
 | |
|   {
 | |
|     thd->event_scheduler.data= connection;
 | |
|       
 | |
|     /* Assign connection to a group. */
 | |
|     thread_group_t *group= 
 | |
|       &all_groups[thd->thread_id%group_count];
 | |
|     
 | |
|     connection->thread_group=group;
 | |
|       
 | |
|     mysql_mutex_lock(&group->mutex);
 | |
|     group->connection_count++;
 | |
|     mysql_mutex_unlock(&group->mutex);
 | |
|     
 | |
|     /*
 | |
|        Add connection to the work queue.Actual logon 
 | |
|        will be done by a worker thread.
 | |
|     */
 | |
|     queue_put(group, connection);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /* Allocation failed */
 | |
|     threadpool_remove_connection(thd);
 | |
|   } 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Terminate connection.
 | |
| */
 | |
| 
 | |
| static void connection_abort(connection_t *connection)
 | |
| {
 | |
|   DBUG_ENTER("connection_abort");
 | |
|   thread_group_t *group= connection->thread_group;
 | |
|   
 | |
|   threadpool_remove_connection(connection->thd); 
 | |
|   
 | |
|   mysql_mutex_lock(&group->mutex);
 | |
|   group->connection_count--;
 | |
|   mysql_mutex_unlock(&group->mutex);
 | |
|   
 | |
|   my_free(connection);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   MySQL scheduler callback: wait begin
 | |
| */
 | |
| 
 | |
| void tp_wait_begin(THD *thd, int type)
 | |
| {
 | |
|   DBUG_ENTER("tp_wait_begin");
 | |
|   DBUG_ASSERT(thd);
 | |
|   connection_t *connection = (connection_t *)thd->event_scheduler.data;
 | |
|   if (connection)
 | |
|   {
 | |
|     DBUG_ASSERT(!connection->waiting);
 | |
|     connection->waiting= true;
 | |
|     wait_begin(connection->thread_group);
 | |
|   }
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   MySQL scheduler callback: wait end
 | |
| */
 | |
| 
 | |
| void tp_wait_end(THD *thd) 
 | |
| { 
 | |
|   DBUG_ENTER("tp_wait_end");
 | |
|   DBUG_ASSERT(thd);
 | |
| 
 | |
|   connection_t *connection = (connection_t *)thd->event_scheduler.data;
 | |
|   if (connection)
 | |
|   {
 | |
|     DBUG_ASSERT(connection->waiting);
 | |
|     connection->waiting = false;
 | |
|     wait_end(connection->thread_group);
 | |
|   }
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void set_next_timeout_check(ulonglong abstime)
 | |
| {
 | |
|   DBUG_ENTER("set_next_timeout_check");
 | |
|   while(abstime < pool_timer.next_timeout_check)
 | |
|   {
 | |
|     longlong old= (longlong)pool_timer.next_timeout_check;
 | |
|     my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
 | |
|           &old, abstime);
 | |
|   }
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Set wait timeout for connection. 
 | |
| */
 | |
| 
 | |
| static void set_wait_timeout(connection_t *c)
 | |
| {
 | |
|   DBUG_ENTER("set_wait_timeout");
 | |
|   /* 
 | |
|     Calculate wait deadline for this connection.
 | |
|     Instead of using microsecond_interval_timer() which has a syscall 
 | |
|     overhead, use pool_timer.current_microtime and take 
 | |
|     into account that its value could be off by at most 
 | |
|     one tick interval.
 | |
|   */
 | |
| 
 | |
|   c->abs_wait_timeout= pool_timer.current_microtime +
 | |
|     1000LL*pool_timer.tick_interval +
 | |
|     1000000LL*c->thd->variables.net_wait_timeout;
 | |
| 
 | |
|   set_next_timeout_check(c->abs_wait_timeout);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Handle a (rare) special case,where connection needs to 
 | |
|   migrate to a different group because group_count has changed
 | |
|   after thread_pool_size setting. 
 | |
| */
 | |
| 
 | |
| static int change_group(connection_t *c, 
 | |
|  thread_group_t *old_group,
 | |
|  thread_group_t *new_group)
 | |
| { 
 | |
|   int ret= 0;
 | |
|   int fd= mysql_socket_getfd(c->thd->net.vio->mysql_socket);
 | |
| 
 | |
|   DBUG_ASSERT(c->thread_group == old_group);
 | |
| 
 | |
|   /* Remove connection from the old group. */
 | |
|   mysql_mutex_lock(&old_group->mutex);
 | |
|   if (c->bound_to_poll_descriptor)
 | |
|   {
 | |
|     io_poll_disassociate_fd(old_group->pollfd,fd);
 | |
|     c->bound_to_poll_descriptor= false;
 | |
|   }
 | |
|   c->thread_group->connection_count--;
 | |
|   mysql_mutex_unlock(&old_group->mutex);
 | |
|   
 | |
|   /* Add connection to the new group. */
 | |
|   mysql_mutex_lock(&new_group->mutex);
 | |
|   c->thread_group= new_group;
 | |
|   new_group->connection_count++;
 | |
|   /* Ensure that there is a listener in the new group. */
 | |
|   if (!new_group->thread_count)
 | |
|     ret= create_worker(new_group);
 | |
|   mysql_mutex_unlock(&new_group->mutex);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| static int start_io(connection_t *connection)
 | |
| { 
 | |
|   int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);
 | |
| 
 | |
|   /*
 | |
|     Usually, connection will stay in the same group for the entire
 | |
|     connection's life. However, we do allow group_count to
 | |
|     change at runtime, which means in rare cases when it changes is 
 | |
|     connection should need to migrate  to another group, this ensures
 | |
|     to ensure equal load between groups.
 | |
| 
 | |
|     So we recalculate in which group the connection should be, based
 | |
|     on thread_id and current group count, and migrate if necessary.
 | |
|   */ 
 | |
|   thread_group_t *group = 
 | |
|     &all_groups[connection->thd->thread_id%group_count];
 | |
| 
 | |
|   if (group != connection->thread_group)
 | |
|   {
 | |
|     if (change_group(connection, connection->thread_group, group))
 | |
|       return -1;
 | |
|   }
 | |
|     
 | |
|   /* 
 | |
|     Bind to poll descriptor if not yet done. 
 | |
|   */ 
 | |
|   if (!connection->bound_to_poll_descriptor)
 | |
|   {
 | |
|     connection->bound_to_poll_descriptor= true;
 | |
|     return io_poll_associate_fd(group->pollfd, fd, connection);
 | |
|   }
 | |
|   
 | |
|   return io_poll_start_read(group->pollfd, fd, connection);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| static void handle_event(connection_t *connection)
 | |
| {
 | |
| 
 | |
|   DBUG_ENTER("handle_event");
 | |
|   int err;
 | |
| 
 | |
|   if (!connection->logged_in)
 | |
|   {
 | |
|     err= threadpool_add_connection(connection->thd);
 | |
|     connection->logged_in= true;
 | |
|   }
 | |
|   else 
 | |
|   {
 | |
|     err= threadpool_process_request(connection->thd);
 | |
|   }
 | |
| 
 | |
|   if(err)
 | |
|     goto end;
 | |
| 
 | |
|   set_wait_timeout(connection);
 | |
|   err= start_io(connection);
 | |
| 
 | |
| end:
 | |
|   if (err)
 | |
|     connection_abort(connection);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Worker thread's main
 | |
| */
 | |
| 
 | |
| static void *worker_main(void *param)
 | |
| {
 | |
|   
 | |
|   worker_thread_t this_thread;
 | |
|   pthread_detach_this_thread();
 | |
|   my_thread_init();
 | |
|   
 | |
|   DBUG_ENTER("worker_main");
 | |
|   
 | |
|   thread_group_t *thread_group = (thread_group_t *)param;
 | |
| 
 | |
|   /* Init per-thread structure */
 | |
|   mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
 | |
|   this_thread.thread_group= thread_group;
 | |
|   this_thread.event_count=0;
 | |
| 
 | |
|   /* Run event loop */
 | |
|   for(;;)
 | |
|   {
 | |
|     connection_t *connection;
 | |
|     struct timespec ts;
 | |
|     set_timespec(ts,threadpool_idle_timeout);
 | |
|     connection = get_event(&this_thread, thread_group, &ts);
 | |
|     if (!connection)
 | |
|       break;
 | |
|     this_thread.event_count++;
 | |
|     handle_event(connection);
 | |
|   }
 | |
| 
 | |
|   /* Thread shutdown: cleanup per-worker-thread structure. */
 | |
|   mysql_cond_destroy(&this_thread.cond);
 | |
| 
 | |
|   bool last_thread;                    /* last thread in group exits */
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   add_thread_count(thread_group, -1);
 | |
|   last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|   /* Last thread in group exits and pool is terminating, destroy group.*/
 | |
|   if (last_thread)
 | |
|     thread_group_destroy(thread_group);
 | |
| 
 | |
|   my_thread_end();
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool tp_init()
 | |
| {
 | |
|   DBUG_ENTER("tp_init");
 | |
|   threadpool_max_size= MY_MAX(threadpool_size, 128);
 | |
|   all_groups= (thread_group_t *)
 | |
|     my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
 | |
|   if (!all_groups)
 | |
|   {
 | |
|     threadpool_max_size= 0;
 | |
|     DBUG_RETURN(1);
 | |
|   }
 | |
|   threadpool_started= true;
 | |
|   scheduler_init();
 | |
| 
 | |
|   for (uint i= 0; i < threadpool_max_size; i++)
 | |
|   {
 | |
|     thread_group_init(&all_groups[i], get_connection_attrib());  
 | |
|   }
 | |
|   tp_set_threadpool_size(threadpool_size);
 | |
|   if(group_count == 0)
 | |
|   {
 | |
|     /* Something went wrong */
 | |
|     sql_print_error("Can't set threadpool size to %d",threadpool_size);
 | |
|     DBUG_RETURN(1);
 | |
|   }
 | |
|   PSI_register(mutex);
 | |
|   PSI_register(cond);
 | |
|   PSI_register(thread);
 | |
|   
 | |
|   pool_timer.tick_interval= threadpool_stall_limit;
 | |
|   start_timer(&pool_timer);
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| void tp_end()
 | |
| {
 | |
|   DBUG_ENTER("tp_end");
 | |
|   
 | |
|   if (!threadpool_started)
 | |
|     DBUG_VOID_RETURN;
 | |
| 
 | |
|   stop_timer(&pool_timer);
 | |
|   shutdown_group_count= threadpool_max_size;
 | |
|   for (uint i= 0; i < threadpool_max_size; i++)
 | |
|   {
 | |
|     thread_group_close(&all_groups[i]);
 | |
|   }
 | |
|   threadpool_started= false;
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /** Ensure that poll descriptors are created when threadpool_size changes */
 | |
| 
 | |
| void tp_set_threadpool_size(uint size)
 | |
| {
 | |
|   bool success= true;
 | |
|   if (!threadpool_started)
 | |
|     return;
 | |
| 
 | |
|   for(uint i=0; i< size; i++)
 | |
|   {
 | |
|     thread_group_t *group= &all_groups[i];
 | |
|     mysql_mutex_lock(&group->mutex);
 | |
|     if (group->pollfd == -1)
 | |
|     {
 | |
|       group->pollfd= io_poll_create();
 | |
|       success= (group->pollfd >= 0);
 | |
|       if(!success)
 | |
|       {
 | |
|         sql_print_error("io_poll_create() failed, errno=%d\n", errno);
 | |
|         break;
 | |
|       }
 | |
|     }  
 | |
|     mysql_mutex_unlock(&all_groups[i].mutex);
 | |
|     if (!success)
 | |
|     {
 | |
|       group_count= i;
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
|   group_count= size;
 | |
| }
 | |
| 
 | |
| void tp_set_threadpool_stall_limit(uint limit)
 | |
| {
 | |
|   if (!threadpool_started)
 | |
|     return;
 | |
|   mysql_mutex_lock(&(pool_timer.mutex));
 | |
|   pool_timer.tick_interval= limit;
 | |
|   mysql_mutex_unlock(&(pool_timer.mutex));
 | |
|   mysql_cond_signal(&(pool_timer.cond));
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  Calculate number of idle/waiting threads in the pool.
 | |
|  
 | |
|  Sum idle threads over all groups. 
 | |
|  Don't do any locking, it is not required for stats.
 | |
| */
 | |
| 
 | |
| int tp_get_idle_thread_count()
 | |
| {
 | |
|   int sum=0;
 | |
|   for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)
 | |
|   {
 | |
|     sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
 | |
|   }
 | |
|   return sum;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* Report threadpool problems */
 | |
| 
 | |
| /** 
 | |
|    Delay in microseconds, after which "pool blocked" message is printed.
 | |
|    (30 sec == 30 Mio usec)
 | |
| */
 | |
| #define BLOCK_MSG_DELAY 30*1000000
 | |
| 
 | |
| #define MAX_THREADS_REACHED_MSG \
 | |
| "Threadpool could not create additional thread to handle queries, because the \
 | |
| number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
 | |
| parameter can help in this situation.\n \
 | |
| If 'extra_port' parameter is set, you can still connect to the database with \
 | |
| superuser account (it must be TCP connection using extra_port as TCP port) \
 | |
| and troubleshoot the situation. \
 | |
| A likely cause of pool blocks are clients that lock resources for long time. \
 | |
| 'show processlist' or 'show engine innodb status' can give additional hints."
 | |
| 
 | |
| #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
 | |
| 
 | |
| /**
 | |
|  Write a message when blocking situation in threadpool occurs.
 | |
|  The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
 | |
|  It will be just a single message for each blocking situation (to prevent
 | |
|  log flood).
 | |
| */
 | |
| 
 | |
| static void print_pool_blocked_message(bool max_threads_reached)
 | |
| {
 | |
|   ulonglong now;
 | |
|   static bool msg_written;
 | |
|   
 | |
|   now= microsecond_interval_timer();
 | |
|   if (pool_block_start == 0)
 | |
|   {
 | |
|     pool_block_start= now;
 | |
|     msg_written = false;
 | |
|     return;
 | |
|   }
 | |
|   
 | |
|   if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
 | |
|   {
 | |
|     if (max_threads_reached)
 | |
|       sql_print_error(MAX_THREADS_REACHED_MSG);
 | |
|     else
 | |
|       sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno);
 | |
|     
 | |
|     sql_print_information("Threadpool has been blocked for %u seconds\n",
 | |
|       (uint)((now- pool_block_start)/1000000));
 | |
|     /* avoid reperated messages for the same blocking situation */
 | |
|     msg_written= true;
 | |
|   }
 | |
| }
 | |
| 
 | |
| #endif /* HAVE_POOL_OF_THREADS */
 |