mirror of
https://github.com/MariaDB/server.git
synced 2026-01-06 05:22:24 +03:00
Threadpool : address some of the monty's review points
Also, print message when pool blocks.
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
#include <time.h>
|
||||
#include <sql_plist.h>
|
||||
#include <threadpool.h>
|
||||
#include <time.h>
|
||||
#ifdef __linux__
|
||||
#include <sys/epoll.h>
|
||||
typedef struct epoll_event native_event;
|
||||
@@ -91,6 +92,7 @@ struct connection_t
|
||||
connection_t **prev_in_queue;
|
||||
ulonglong abs_wait_timeout;
|
||||
bool logged_in;
|
||||
bool bound_to_poll_descriptor;
|
||||
bool waiting;
|
||||
};
|
||||
|
||||
@@ -127,6 +129,12 @@ struct thread_group_t
|
||||
static thread_group_t all_groups[MAX_THREAD_GROUPS];
|
||||
static uint group_count;
|
||||
|
||||
/**
|
||||
Used for printing "pool blocked" message, see
|
||||
print_pool_blocked_message();
|
||||
*/
|
||||
static time_t pool_block_start;
|
||||
|
||||
/* Global timer for all groups */
|
||||
struct pool_timer_t
|
||||
{
|
||||
@@ -140,6 +148,7 @@ struct pool_timer_t
|
||||
|
||||
static pool_timer_t pool_timer;
|
||||
|
||||
|
||||
/* Externals functions and variables we use */
|
||||
extern void scheduler_init();
|
||||
extern pthread_attr_t *get_connection_attrib(void);
|
||||
@@ -155,7 +164,7 @@ static void connection_abort(connection_t *connection);
|
||||
void tp_post_kill_notification(THD *thd);
|
||||
static void set_wait_timeout(connection_t *connection);
|
||||
static void set_next_timeout_check(ulonglong abstime);
|
||||
|
||||
static void print_pool_blocked_message(bool);
|
||||
|
||||
/**
|
||||
Asynchronous network IO.
|
||||
@@ -641,9 +650,9 @@ static connection_t * listener(worker_thread_t *current_thread,
|
||||
and wake a worker.
|
||||
|
||||
NOTE: Currently nothing is done to detect or prevent long queuing times.
|
||||
A solution (for the future) would be to give up "one active thread per group"
|
||||
principle, if events stay in the queue for too long, and wake more workers.
|
||||
|
||||
A solutionc for the future would be to give up "one active thread per
|
||||
group" principle, if events stay in the queue for too long, and just wake
|
||||
more workers.
|
||||
*/
|
||||
|
||||
bool listener_picks_event= thread_group->queue.is_empty();
|
||||
@@ -714,16 +723,18 @@ static connection_t * listener(worker_thread_t *current_thread,
|
||||
static int create_worker(thread_group_t *thread_group)
|
||||
{
|
||||
pthread_t thread_id;
|
||||
bool max_threads_reached= false;
|
||||
int err;
|
||||
|
||||
DBUG_ENTER("create_worker");
|
||||
if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
|
||||
&& thread_group->thread_count >= 2)
|
||||
{
|
||||
DBUG_PRINT("info",
|
||||
("Cannot create new thread (maximum allowed threads reached)"));
|
||||
DBUG_RETURN(-1);
|
||||
err= 1;
|
||||
max_threads_reached= true;
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
||||
err= mysql_thread_create(key_worker_thread, &thread_id,
|
||||
thread_group->pthread_attr, worker_main, thread_group);
|
||||
if (!err)
|
||||
@@ -731,6 +742,18 @@ static int create_worker(thread_group_t *thread_group)
|
||||
thread_group->pending_thread_start_count++;
|
||||
thread_group->last_thread_creation_time=microsecond_interval_timer();
|
||||
}
|
||||
else
|
||||
{
|
||||
my_errno= errno;
|
||||
}
|
||||
|
||||
|
||||
end:
|
||||
if (err)
|
||||
print_pool_blocked_message(max_threads_reached);
|
||||
else
|
||||
pool_block_start= 0; /* Reset pool blocked timer, if it was set */
|
||||
|
||||
DBUG_RETURN(err);
|
||||
}
|
||||
|
||||
@@ -816,7 +839,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
|
||||
DBUG_ENTER("thread_group_init");
|
||||
thread_group->pthread_attr = thread_attr;
|
||||
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
|
||||
thread_group->pollfd=-1;
|
||||
thread_group->pollfd= -1;
|
||||
thread_group->shutdown_pipe[0]= -1;
|
||||
thread_group->shutdown_pipe[1]= -1;
|
||||
DBUG_RETURN(0);
|
||||
@@ -1113,6 +1136,7 @@ connection_t *alloc_connection(THD *thd)
|
||||
connection->thd = thd;
|
||||
connection->waiting= false;
|
||||
connection->logged_in= false;
|
||||
connection->bound_to_poll_descriptor= false;
|
||||
connection->abs_wait_timeout= ULONGLONG_MAX;
|
||||
}
|
||||
DBUG_RETURN(connection);
|
||||
@@ -1289,8 +1313,11 @@ static int change_group(connection_t *c,
|
||||
|
||||
/* Remove connection from the old group. */
|
||||
mysql_mutex_lock(&old_group->mutex);
|
||||
if (c->logged_in)
|
||||
if (c->bound_to_poll_descriptor)
|
||||
{
|
||||
io_poll_disassociate_fd(old_group->pollfd,fd);
|
||||
c->bound_to_poll_descriptor= false;
|
||||
}
|
||||
c->thread_group->connection_count--;
|
||||
mysql_mutex_unlock(&old_group->mutex);
|
||||
|
||||
@@ -1325,22 +1352,18 @@ static int start_io(connection_t *connection)
|
||||
|
||||
if (group != connection->thread_group)
|
||||
{
|
||||
if (!change_group(connection, connection->thread_group, group))
|
||||
if (change_group(connection, connection->thread_group, group))
|
||||
{
|
||||
connection->logged_in= true;
|
||||
return io_poll_associate_fd(group->pollfd, fd, connection);
|
||||
}
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
In case binding to a poll descriptor was not yet done,
|
||||
(start_io called first time), do it now.
|
||||
Bind to poll descriptor if not yet done.
|
||||
*/
|
||||
if(!connection->logged_in)
|
||||
if(!connection->bound_to_poll_descriptor)
|
||||
{
|
||||
connection->logged_in= true;
|
||||
connection->bound_to_poll_descriptor= true;
|
||||
return io_poll_associate_fd(group->pollfd, fd, connection);
|
||||
}
|
||||
|
||||
@@ -1353,36 +1376,34 @@ static void handle_event(connection_t *connection)
|
||||
{
|
||||
|
||||
DBUG_ENTER("handle_event");
|
||||
int ret;
|
||||
int err;
|
||||
|
||||
if (!connection->logged_in)
|
||||
{
|
||||
ret= threadpool_add_connection(connection->thd);
|
||||
err = threadpool_add_connection(connection->thd);
|
||||
connection->logged_in= true;
|
||||
}
|
||||
else
|
||||
{
|
||||
ret= threadpool_process_request(connection->thd);
|
||||
err = threadpool_process_request(connection->thd);
|
||||
}
|
||||
|
||||
if(!ret)
|
||||
if(!err)
|
||||
{
|
||||
set_wait_timeout(connection);
|
||||
ret= start_io(connection);
|
||||
err= start_io(connection);
|
||||
}
|
||||
|
||||
if (ret)
|
||||
if (err)
|
||||
{
|
||||
connection_abort(connection);
|
||||
}
|
||||
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
/**
|
||||
Worker thread's main
|
||||
*/
|
||||
|
||||
static void *worker_main(void *param)
|
||||
{
|
||||
|
||||
@@ -1432,8 +1453,9 @@ static void *worker_main(void *param)
|
||||
my_atomic_add32(&tp_stats.num_worker_threads, -1);
|
||||
|
||||
/* If it is the last thread in group and pool is terminating, destroy group.*/
|
||||
if (thread_group->shutdown && thread_group->thread_count == 0
|
||||
&& thread_group->pending_thread_start_count == 0)
|
||||
if (thread_group->shutdown
|
||||
&& thread_group->thread_count == 0
|
||||
&& thread_group->pending_thread_start_count == 0)
|
||||
{
|
||||
thread_group_destroy(thread_group);
|
||||
}
|
||||
@@ -1480,7 +1502,8 @@ void tp_end()
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
/* Ensure that poll descriptors are created when threadpool_size changes */
|
||||
|
||||
/** Ensure that poll descriptors are created when threadpool_size changes */
|
||||
int tp_set_threadpool_size(uint size)
|
||||
{
|
||||
bool success= true;
|
||||
@@ -1513,8 +1536,8 @@ void tp_set_threadpool_stall_limit(uint limit)
|
||||
return;
|
||||
mysql_mutex_lock(&(pool_timer.mutex));
|
||||
pool_timer.tick_interval= limit;
|
||||
mysql_cond_signal(&(pool_timer.cond));
|
||||
mysql_mutex_unlock(&(pool_timer.mutex));
|
||||
mysql_cond_signal(&(pool_timer.cond));
|
||||
}
|
||||
|
||||
|
||||
@@ -1527,9 +1550,61 @@ void tp_set_threadpool_stall_limit(uint limit)
|
||||
int tp_get_idle_thread_count()
|
||||
{
|
||||
int sum=0;
|
||||
for(uint i= 0; i< array_elements(all_groups) && (all_groups[i].pollfd >= 0); i++)
|
||||
for(uint i= 0;
|
||||
i< array_elements(all_groups) && (all_groups[i].pollfd >= 0);
|
||||
i++)
|
||||
{
|
||||
sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
|
||||
/* Report threadpool problems */
|
||||
|
||||
#define BLOCK_MSG_DELAY 30
|
||||
|
||||
static const char *max_threads_reached_msg=
|
||||
"Threadpool could not create additional thread to handle queries, because the "
|
||||
"number of allowed threads was reached. Increasing 'thread_pool_max_threads' "
|
||||
"parameter can help in this situation.\n"
|
||||
"If 'extra_port' parameter is set, you can still connect to the database with "
|
||||
"superuser account (it must be TCP connection using extra_port as TCP port) "
|
||||
"and troubleshoot the situation. "
|
||||
"A likely cause of pool blocks are clients that lock resources for long time. "
|
||||
"'show processlist' or 'show engine innodb status' can give additional hints.";
|
||||
|
||||
static const char *create_thread_error_msg=
|
||||
"Can't create threads in threadpool (errno=%d).";
|
||||
|
||||
/**
|
||||
Write a message when blocking situation in threadpool occurs.
|
||||
The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
|
||||
It will be just a single message for each blocking situation (to prevent
|
||||
log flood).
|
||||
*/
|
||||
static void print_pool_blocked_message(bool max_threads_reached)
|
||||
{
|
||||
time_t now;
|
||||
static bool msg_written;
|
||||
|
||||
now= time(NULL);
|
||||
if (pool_block_start == 0)
|
||||
{
|
||||
pool_block_start= now;
|
||||
msg_written = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if(now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
|
||||
{
|
||||
if(max_threads_reached)
|
||||
sql_print_error(max_threads_reached_msg);
|
||||
else
|
||||
sql_print_error(create_thread_error_msg, my_errno);
|
||||
|
||||
sql_print_information("Threadpool has been blocked for %u seconds\n",(uint)(now- pool_block_start));
|
||||
/* avoid reperated messages for the same blocking situation */
|
||||
msg_written= true;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user