diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index 2badf5e5d09..94d7465d39b 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -934,7 +934,7 @@ lower-case-table-names 1 master-info-file master.info master-retry-count 86400 master-verify-checksum FALSE -max-allowed-packet 8388608 +max-allowed-packet 1048576 max-binlog-cache-size 18446744073709547520 max-binlog-size 1073741824 max-binlog-stmt-cache-size 18446744073709547520 @@ -945,7 +945,7 @@ max-error-count 64 max-heap-table-size 16777216 max-join-size 18446744073709551615 max-length-for-sort-data 1024 -max-long-data-size 8388608 +max-long-data-size 1048576 max-prepared-stmt-count 16382 max-relay-log-size 0 max-seeks-for-key 18446744073709551615 diff --git a/sql/sql_class.cc b/sql/sql_class.cc index fc1c05c236f..8b8d07a47eb 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -3918,6 +3918,7 @@ extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd) SYNOPSIS thd_wait_begin() thd Thread object + Can be NULL, in this case current THD is used. wait_type Type of wait 1 -- short wait (e.g. for mutex) 2 -- medium wait (e.g. for disk io) @@ -3945,6 +3946,7 @@ extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type) when they waking up from a sleep/stall. @param thd Thread handle + Can be NULL, in this case current THD is used. */ extern "C" void thd_wait_end(MYSQL_THD thd) { diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 65c74ffbeca..19f0b55f1fa 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2245,7 +2245,7 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type) #ifdef _WIN32 static Sys_var_uint Sys_threadpool_min_threads( "thread_pool_min_threads", - "Minimuim number of threads in the thread pool.", + "Minimum number of threads in the thread pool.", GLOBAL_VAR(threadpool_min_threads), CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, 256), DEFAULT(1), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), @@ -2267,8 +2267,9 @@ static Sys_var_uint Sys_threadpool_oversubscribe( ); static Sys_var_uint Sys_threadpool_size( "thread_pool_size", - "Number of concurrently executing threads in the pool. " - "Leaving value default (0) sets it to the number of processors.", + "Number of thread groups in the pool. " + "This parameter is roughly equivalent to maximum number of concurrently " + "executing threads (threads in a waiting state do not count as executing).", GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, MAX_THREAD_GROUPS), DEFAULT(my_getncpus()), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index 5dcc9d4420c..9dc3739dfb4 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -206,6 +206,7 @@ static void print_pool_blocked_message(bool); or io_poll_start_read() becomes readable. Data associated with descriptors can be retrieved from native_events array, using native_event_get_userdata() function. + On Linux: epoll_wait() */ @@ -248,6 +249,11 @@ int io_poll_disassociate_fd(int pollfd, int fd) } +/* + Wrapper around epoll_wait. + NOTE - in case of EINTR, it restarts with original timeout. Since we use + either infinite or 0 timeouts, this is not critical +*/ int io_poll_wait(int pollfd, native_event *native_events, int maxevents, int timeout_ms) { @@ -260,6 +266,7 @@ int io_poll_wait(int pollfd, native_event *native_events, int maxevents, return ret; } + static void *native_event_get_userdata(native_event *event) { return event->data.ptr; @@ -364,8 +371,10 @@ int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms (timeout_ms >= 0)?&ts:NULL); } while (ret == -1 && errno == EINTR); - return nget; + DBUG_ASSERT(nget < INT_MAX); + return (int)nget; } + static void* native_event_get_userdata(native_event *event) { return event->portev_user; @@ -375,9 +384,8 @@ static void* native_event_get_userdata(native_event *event) #endif - - /* Dequeue element from a workqueue */ + static connection_t *queue_get(thread_group_t *thread_group) { DBUG_ENTER("queue_get"); @@ -391,12 +399,12 @@ static connection_t *queue_get(thread_group_t *thread_group) } - /* Handle wait timeout : Find connections that have been idle for too long and kill them. Also, recalculate time when next timeout check should run. */ + static void timeout_check(pool_timer_t *timer) { DBUG_ENTER("timeout_check"); @@ -418,7 +426,7 @@ static void timeout_check(pool_timer_t *timer) { /* Connection does not have scheduler data. This happens for example - if THD belongs to another scheduler, that is listening to extra_port. + if THD belongs to a different scheduler, that is listening to extra_port. */ continue; } @@ -458,18 +466,18 @@ static void timeout_check(pool_timer_t *timer) static void* timer_thread(void *param) { uint i; - pool_timer_t* timer=(pool_timer_t *)param; - timer->next_timeout_check= ULONGLONG_MAX; - timer->current_microtime= microsecond_interval_timer(); - + my_thread_init(); DBUG_ENTER("timer_thread"); - + timer->next_timeout_check= ULONGLONG_MAX; + timer->current_microtime= microsecond_interval_timer(); + for(;;) { struct timespec ts; int err; + set_timespec_nsec(ts,timer->tick_interval*1000000); mysql_mutex_lock(&timer->mutex); err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); @@ -543,7 +551,6 @@ void check_stall(thread_group_t *thread_group) } - static void start_timer(pool_timer_t* timer) { pthread_t thread_id; @@ -555,6 +562,7 @@ static void start_timer(pool_timer_t* timer) DBUG_VOID_RETURN; } + static void stop_timer(pool_timer_t *timer) { DBUG_ENTER("stop_timer"); @@ -664,50 +672,50 @@ static connection_t * listener(worker_thread_t *current_thread, thread_group->queue.push_back(c); } - - if(thread_group->active_thread_count==0 && !listener_picks_event) + if (listener_picks_event) { - /* Wake one worker thread */ + /* Handle the first event. */ + retval= (connection_t *)native_event_get_userdata(&ev[0]); + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + if(thread_group->active_thread_count==0) + { + /* We added some work items to queue, now wake a worker. */ if(wake_thread(thread_group)) { /* - Wake failed, groups has no idle threads. - Now check if the group has at least one worker. + Wake failed, hence groups has no idle threads. Now check if there are + any threads in the group except listener. */ if(thread_group->thread_count == 1 && thread_group->pending_thread_start_count == 0) { - /* + /* Currently there is no worker thread in the group, as indicated by - thread_count == 1 (means listener is the only one thread in the - group). - - Rhe queue is not empty, and listener is not going to handle - events. In order to drain the queue, we create a worker here. - Alternatively, we could just rely on timer to detect stall, but - this would be an inefficient, pointless delay. + thread_count == 1 (this means listener is the only one thread in + the group). + The queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, and + create thread, but waiting for timer would be an inefficient and + pointless delay. */ create_worker(thread_group); } } } mysql_mutex_unlock(&thread_group->mutex); - - if (listener_picks_event) - { - retval= (connection_t *)native_event_get_userdata(&ev[0]); - break; - } } - - + DBUG_RETURN(retval); } -/* +/** Creates a new worker thread. thread_mutex must be held when calling this function @@ -806,10 +814,10 @@ static int wake_or_create_thread(thread_group_t *thread_group) if (thread_group->active_thread_count == 0) { /* - We're better off creating a new thread here with no delay, - either there is no workers at all, or they all are all blocking - and there was no sleeping thread to wakeup. It smells like deadlock - or very slowly executing requests, e.g sleeps or user locks. + We're better off creating a new thread here with no delay, either there + are no workers at all, or they all are all blocking and there was no + idle thread to wakeup. Smells like a potential deadlock or very slowly + executing requests, e.g sleeps or user locks. */ DBUG_RETURN(create_worker(thread_group)); } @@ -862,7 +870,8 @@ void thread_group_destroy(thread_group_t *thread_group) /** Wake sleeping thread from waiting list - */ +*/ + static int wake_thread(thread_group_t *thread_group) { DBUG_ENTER("wake_thread"); @@ -879,16 +888,14 @@ static int wake_thread(thread_group_t *thread_group) } -/* +/** Initiate shutdown for thread group. - - The shutdown is asynchronous, we only care to wake all threads - in here, so they can finish. We do not wait here until threads - terminate, - - Final cleanup of the group (thread_group_destroy) will be done by - the last exiting threads. + + The shutdown is asynchronous, we only care to wake all threads in here, so + they can finish. We do not wait here until threads terminate. Final cleanup + of the group (thread_group_destroy) will be done by the last exiting threads. */ + static void thread_group_close(thread_group_t *thread_group) { DBUG_ENTER("thread_group_close"); @@ -938,10 +945,11 @@ static void thread_group_close(thread_group_t *thread_group) perform login (this is done in worker threads). */ + static void queue_put(thread_group_t *thread_group, connection_t *connection) { DBUG_ENTER("queue_put"); - + mysql_mutex_lock(&thread_group->mutex); thread_group->queue.push_back(connection); if (thread_group->active_thread_count == 0) @@ -949,14 +957,16 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) wake_or_create_thread(thread_group); } mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; } /* - This is used to prevent too many threads executing at the same time, - if the workload is not CPU bound. + Prevent too many threads executing at the same time,if the workload is + not CPU bound. */ + static bool too_many_threads(thread_group_t *thread_group) { return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe @@ -964,7 +974,6 @@ static bool too_many_threads(thread_group_t *thread_group) } - /** Retrieve a connection with pending event. @@ -981,17 +990,15 @@ static bool too_many_threads(thread_group_t *thread_group) @return connection with pending event. NULL is returned if timeout has expired,or on shutdown. */ + connection_t *get_event(worker_thread_t *current_thread, thread_group_t *thread_group, struct timespec *abstime) { DBUG_ENTER("get_event"); - connection_t *connection = NULL; int err=0; mysql_mutex_lock(&thread_group->mutex); - - DBUG_ASSERT(thread_group->active_thread_count >= 0); do @@ -1083,6 +1090,7 @@ connection_t *get_event(worker_thread_t *current_thread, Tells the pool that worker starts waiting on IO, lock, condition, sleep() or similar. */ + void wait_begin(thread_group_t *thread_group) { DBUG_ENTER("wait_begin"); @@ -1200,7 +1208,7 @@ static void connection_abort(connection_t *connection) /** - MySQL scheduler callback : kill connection + MySQL scheduler callback : kill connection */ void tp_post_kill_notification(THD *thd) @@ -1215,7 +1223,7 @@ void tp_post_kill_notification(THD *thd) } /** - MySQL scheduler callback: wait begin + MySQL scheduler callback: wait begin */ void tp_wait_begin(THD *thd, int type) @@ -1237,7 +1245,7 @@ void tp_wait_begin(THD *thd, int type) /** - MySQL scheduler callback: wait end + MySQL scheduler callback: wait end */ void tp_wait_end(THD *thd) @@ -1256,7 +1264,7 @@ void tp_wait_end(THD *thd) DBUG_VOID_RETURN; } - + static void set_next_timeout_check(ulonglong abstime) { DBUG_ENTER("set_next_timeout_check"); @@ -1273,7 +1281,6 @@ static void set_next_timeout_check(ulonglong abstime) /** Set wait timeout for connection. */ - static void set_wait_timeout(connection_t *c) { DBUG_ENTER("set_wait_timeout"); @@ -1400,8 +1407,9 @@ static void handle_event(connection_t *connection) } /** - Worker thread's main + Worker thread's main */ + static void *worker_main(void *param) { @@ -1543,7 +1551,8 @@ void tp_set_threadpool_stall_limit(uint limit) Calculate number of idle/waiting threads in the pool. Sum idle threads over all groups. - Don't do any locking, it is not required for stats. + D + on't do any locking, it is not required for stats. */ int tp_get_idle_thread_count() { @@ -1601,7 +1610,8 @@ static void print_pool_blocked_message(bool max_threads_reached) else sql_print_error(create_thread_error_msg, my_errno); - sql_print_information("Threadpool has been blocked for %u seconds\n",(uint)(now- pool_block_start)); + sql_print_information("Threadpool has been blocked for %u seconds\n", + (uint)(now- pool_block_start)); /* avoid reperated messages for the same blocking situation */ msg_written= true; } diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 7a2c8c0c6cf..ae186a614ee 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -17,6 +17,30 @@ #include +/* + Threadpool API is not available on XP. We still want to compile a single + version on Windows, but use the latest functionality if available. + We cannot use threadpool functionality directly, since executable won't + start on XP and loader will complain about missing symbols. + + We solve using the usual way it is done on Windows, i.e with dynamic loading. + We'll need to load a lot of function, and make this less painful with the + WEAK_SYMBOL macro below +*/ + +/* + WEAK_SYMBOL(return_type, function_name, argument_type1,..,argument_typeN) + + Declare and load function pointer from kernel32. The name of the static + variable that holds the function pointer is my_ + This should be combined with + #define my_ + so that one could use Widows APIs transparently, without worrying whether + they are present in a particular version or not. + + Of course, prior to use of any function there should be a check for correct + Windows version, or check whether function pointer is not NULL. +*/ #define WEAK_SYMBOL(return_type, function, ...) \ typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \ static pFN_##function my_##function = (pFN_##function) \ @@ -110,9 +134,7 @@ WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk); WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL, PTP_POOL_STACK_INFORMATION); #define SetThreadpoolStackInformation my_SetThreadpoolStackInformation -#endif - -#if _MSC_VER < 1600 +#else /* _MSC_VER < 1600 */ #define SetThreadpoolCallbackPriority(env,prio) typedef enum _TP_CALLBACK_PRIORITY { TP_CALLBACK_PRIORITY_HIGH, @@ -158,8 +180,6 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); -#define CONNECTION_SIGNATURE 0xAFFEAFFE - static void check_thread_init(); /* Get current time as Windows time */ @@ -178,21 +198,19 @@ static ulonglong now() struct connection_t { THD *thd; - bool logged_in; HANDLE handle; OVERLAPPED overlapped; - /* absolute time for wait timeout (as Windows time) */ volatile ulonglong timeout; - PTP_CLEANUP_GROUP cleanup_group; TP_CALLBACK_ENVIRON callback_environ; - PTP_IO io; PTP_TIMER timer; PTP_WAIT shm_read; + bool logged_in; }; + void init_connection(connection_t *connection) { connection->logged_in = false; @@ -208,6 +226,7 @@ void init_connection(connection_t *connection) connection->thd = 0; } + int init_io(connection_t *connection, THD *thd) { connection->thd= thd; @@ -237,7 +256,7 @@ int init_io(connection_t *connection, THD *thd) if (connection->handle) { /* Performance tweaks (s. MSDN documentation)*/ - UCHAR flags = FILE_SKIP_SET_EVENT_ON_HANDLE; + UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; if (skip_completion_port_on_success) { flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; @@ -245,7 +264,7 @@ int init_io(connection_t *connection, THD *thd) (void)SetFileCompletionNotificationModes(connection->handle, flags); /* Assign io completion callback */ - connection->io = CreateThreadpoolIo(connection->handle, + connection->io= CreateThreadpoolIo(connection->handle, io_completion_callback, connection, &connection->callback_environ); if(!connection->io) { @@ -253,7 +272,7 @@ int init_io(connection_t *connection, THD *thd) return -1; } } - connection->timer = CreateThreadpoolTimer(timer_callback, connection, + connection->timer= CreateThreadpoolTimer(timer_callback, connection, &connection->callback_environ); if (!connection->timer) { @@ -354,6 +373,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) return -1; } + int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) { if (threadpool_add_connection(connection->thd) == 0 @@ -380,21 +400,6 @@ void set_wait_timeout(connection_t *connection, ulonglong old_timeout) connection->timeout = new_timeout; } -/* - Terminates (idle) connection by closing the socket. - This will activate io_completion_callback() in a different thread -*/ -void post_kill_notification(connection_t *connection) -{ - check_thread_init(); - THD *thd=connection->thd; - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed = KILL_CONNECTION; - vio_shutdown(thd->net.vio, SHUT_RDWR); - thd->mysys_var= NULL; - mysql_mutex_unlock(&thd->LOCK_thd_data); -} - /* Connection destructor */ void destroy_connection(connection_t *connection) @@ -438,7 +443,6 @@ static void check_thread_init() if (FlsGetValue(fls) == NULL) { FlsSetValue(fls, (void *)1); - my_thread_init(); thread_created++; InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); } @@ -446,28 +450,14 @@ static void check_thread_init() /* - Take care of proper cleanup when threadpool threads exit. - We do not control how threads are created, thus it is our responsibility to - check that my_thread_init() is called on thread initialization and - my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the - thread destruction callbacks. + Decrement number of threads when a thread exits . + On Windows, FlsAlloc() provides the thread destruction callbacks. */ static VOID WINAPI thread_destructor(void *data) { if(data) { - if (InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads) >= 0) - { - /* - The above check for number of thread >= 0 is due to shutdown code ( - see tp_end()) where we forcefully set num_worker_threads to 0, even - if not all threads have shut down yet to the point they would ran Fls - destructors, even after CloseThreadpool(). See also comment in tp_end(). - */ - mysql_mutex_lock(&LOCK_thread_count); - my_thread_end(); - mysql_mutex_unlock(&LOCK_thread_count); - } + InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads); } } @@ -507,7 +497,7 @@ bool tp_init(void) { TP_POOL_STACK_INFORMATION stackinfo; stackinfo.StackCommit = 0; - stackinfo.StackReserve = my_thread_stack_size; + stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; if (!SetThreadpoolStackInformation(pool, &stackinfo)) { tp_log_warning("Can't set threadpool stack size", @@ -520,45 +510,19 @@ bool tp_init(void) } -/* +/** Scheduler callback : Destroy the scheduler. */ - -extern "C" uint THR_thread_count; -extern "C" mysql_mutex_t THR_LOCK_threads; -extern "C" mysql_cond_t THR_COND_threads; - void tp_end(void) { if(pool) { SetThreadpoolThreadMaximum(pool, 0); CloseThreadpool(pool); - - /* - Tell my_global_thread_end() we're complete. - - This would not be necessary if CloseThreadpool() would synchronously - release all threads and wait until they disappear and call all their FLS - destructors . However, threads in the pool are released asynchronously - and might spend some time in the CRT shutdown code. Thus zero - num_worker_threads, to avoid thread destructor's my_thread_end()s after - this point. - */ - LONG remaining_threads= - InterlockedExchange( (volatile long *)&tp_stats.num_worker_threads, 0); - - if (remaining_threads) - { - mysql_mutex_lock(&THR_LOCK_threads); - THR_thread_count -= remaining_threads; - mysql_cond_signal(&THR_COND_threads); - mysql_mutex_unlock(&THR_LOCK_threads); - } } } -/* +/** Notify pool about connection being killed. */ void tp_post_kill_notification(THD *thd) @@ -606,7 +570,7 @@ error: DisassociateCurrentThreadFromCallback(instance); destroy_connection(connection); - my_free(connection); + free(connection); } @@ -623,7 +587,7 @@ static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance, if (login(connection, instance) != 0) { destroy_connection(connection); - my_free(connection); + free(connection); } } @@ -688,9 +652,8 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, */ void tp_add_connection(THD *thd) { - bool success = false; - connection_t *con = (connection_t *)my_malloc(sizeof(connection_t), 0); - + connection_t *con = (connection_t *)malloc(sizeof(connection_t)); + if (con) threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); @@ -698,6 +661,7 @@ void tp_add_connection(THD *thd) if(!con) { tp_log_warning("Allocation failed", "tp_add_connection"); + threadpool_remove_connection(thd); return; } @@ -718,8 +682,7 @@ void tp_add_connection(THD *thd) } - -/* +/** Sets the number of idle threads the thread pool maintains in anticipation of new requests. */