mirror of
https://github.com/MariaDB/server.git
synced 2025-07-27 18:02:13 +03:00
Threadpool -address review comments
This commit is contained in:
@ -9,30 +9,28 @@ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall c
|
|||||||
extern uint threadpool_max_threads; /* Maximum threads in pool */
|
extern uint threadpool_max_threads; /* Maximum threads in pool */
|
||||||
extern uint threadpool_oversubscribe; /* Maximum active threads in group */
|
extern uint threadpool_oversubscribe; /* Maximum active threads in group */
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Functions used by scheduler.
|
||||||
|
OS-specific implementations are in
|
||||||
|
threadpool_unix.cc or threadpool_win.cc
|
||||||
|
*/
|
||||||
|
extern bool tp_init();
|
||||||
|
extern void tp_add_connection(THD*);
|
||||||
|
extern void tp_wait_begin(THD *, int);
|
||||||
|
extern void tp_wait_end(THD*);
|
||||||
|
extern void tp_post_kill_notification(THD *thd);
|
||||||
|
extern void tp_end(void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Threadpool statistics
|
Threadpool statistics
|
||||||
*/
|
*/
|
||||||
struct TP_STATISTICS
|
struct TP_STATISTICS
|
||||||
{
|
{
|
||||||
/* Current number of worker thread. */
|
/* Current number of worker thread. */
|
||||||
volatile int num_worker_threads;
|
volatile int32 num_worker_threads;
|
||||||
/* Current number of idle threads. */
|
/* Current number of idle threads. */
|
||||||
volatile int num_waiting_threads;
|
volatile int32 num_waiting_threads;
|
||||||
/* Number of login requests are queued but not yet processed. */
|
|
||||||
volatile int pending_login_requests;
|
|
||||||
/* Number of threads that are starting. */
|
|
||||||
volatile int pending_thread_starts;
|
|
||||||
/* Number of threads that are being shut down */
|
|
||||||
volatile int pending_thread_shutdowns;
|
|
||||||
/* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */
|
|
||||||
ulonglong pool_block_duration;
|
|
||||||
/* Maximum duration of the pending login, im milliseconds. */
|
|
||||||
ulonglong pending_login_duration;
|
|
||||||
/* Time since last thread was created */
|
|
||||||
ulonglong time_since_last_thread_creation;
|
|
||||||
/* Number of requests processed since pool monitor run last time. */
|
|
||||||
volatile int requests_dequeued;
|
|
||||||
volatile int requests_completed;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
extern TP_STATISTICS tp_stats;
|
extern TP_STATISTICS tp_stats;
|
||||||
|
@ -7,15 +7,9 @@
|
|||||||
#include <sql_connect.h>
|
#include <sql_connect.h>
|
||||||
#include <sql_audit.h>
|
#include <sql_audit.h>
|
||||||
#include <debug_sync.h>
|
#include <debug_sync.h>
|
||||||
|
#include <threadpool.h>
|
||||||
|
|
||||||
|
|
||||||
extern bool login_connection(THD *thd);
|
|
||||||
extern bool do_command(THD *thd);
|
|
||||||
extern void prepare_new_connection_state(THD* thd);
|
|
||||||
extern void end_connection(THD *thd);
|
|
||||||
extern void thd_cleanup(THD *thd);
|
|
||||||
extern void delete_thd(THD *thd);
|
|
||||||
|
|
||||||
/* Threadpool parameters */
|
/* Threadpool parameters */
|
||||||
|
|
||||||
uint threadpool_min_threads;
|
uint threadpool_min_threads;
|
||||||
@ -27,14 +21,15 @@ uint threadpool_oversubscribe;
|
|||||||
|
|
||||||
|
|
||||||
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
|
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
|
||||||
|
extern bool do_command(THD*);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Worker threads contexts, and THD contexts.
|
Worker threads contexts, and THD contexts.
|
||||||
=====================================
|
=========================================
|
||||||
|
|
||||||
Both worker threads and connections have their sets of thread local variables
|
Both worker threads and connections have their sets of thread local variables
|
||||||
At the moment it is mysys_var (which has e.g dbug my_error and similar
|
At the moment it is mysys_var (this has specific data for dbug, my_error and
|
||||||
goodies inside), and PSI per-client structure.
|
similar goodies), and PSI per-client structure.
|
||||||
|
|
||||||
Whenever query is executed following needs to be done:
|
Whenever query is executed following needs to be done:
|
||||||
|
|
||||||
@ -77,7 +72,7 @@ struct Worker_thread_context
|
|||||||
/*
|
/*
|
||||||
Attach/associate the connection with the OS thread,
|
Attach/associate the connection with the OS thread,
|
||||||
*/
|
*/
|
||||||
static inline bool thread_attach(THD* thd)
|
static bool thread_attach(THD* thd)
|
||||||
{
|
{
|
||||||
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
|
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
|
||||||
thd->thread_stack=(char*)&thd;
|
thd->thread_stack=(char*)&thd;
|
||||||
@ -96,10 +91,9 @@ int threadpool_add_connection(THD *thd)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
Create a new connection context: mysys_thread_var and PSI thread
|
Create a new connection context: mysys_thread_var and PSI thread
|
||||||
Store them in thd->mysys_var and thd->scheduler.m_psi.
|
Store them in THD.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* Use my_thread_init() to create new mysys_thread_var. */
|
|
||||||
pthread_setspecific(THR_KEY_mysys, 0);
|
pthread_setspecific(THR_KEY_mysys, 0);
|
||||||
my_thread_init();
|
my_thread_init();
|
||||||
thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
|
thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
|
||||||
@ -125,21 +119,29 @@ int threadpool_add_connection(THD *thd)
|
|||||||
thd->start_utime= now;
|
thd->start_utime= now;
|
||||||
thd->thr_create_utime= now;
|
thd->thr_create_utime= now;
|
||||||
|
|
||||||
if (setup_connection_thread_globals(thd) == 0)
|
if (!setup_connection_thread_globals(thd))
|
||||||
{
|
{
|
||||||
if (login_connection(thd) == 0)
|
if (!login_connection(thd))
|
||||||
{
|
{
|
||||||
prepare_new_connection_state(thd);
|
prepare_new_connection_state(thd);
|
||||||
retval = thd_is_connection_alive(thd)?0:-1;
|
|
||||||
thd->net.reading_or_writing= 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
thd->skip_wait_timeout= true;
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Check if THD is ok, as prepare_new_connection_state()
|
||||||
|
can fail, for example if init command failed.
|
||||||
|
*/
|
||||||
|
if (thd_is_connection_alive(thd))
|
||||||
|
{
|
||||||
|
retval= 0;
|
||||||
|
thd->net.reading_or_writing= 1;
|
||||||
|
thd->skip_wait_timeout= true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
worker_context.restore();
|
worker_context.restore();
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void threadpool_remove_connection(THD *thd)
|
void threadpool_remove_connection(THD *thd)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -147,9 +149,7 @@ void threadpool_remove_connection(THD *thd)
|
|||||||
worker_context.save();
|
worker_context.save();
|
||||||
|
|
||||||
thread_attach(thd);
|
thread_attach(thd);
|
||||||
|
|
||||||
thd->killed= KILL_CONNECTION;
|
thd->killed= KILL_CONNECTION;
|
||||||
|
|
||||||
thd->net.reading_or_writing= 0;
|
thd->net.reading_or_writing= 0;
|
||||||
|
|
||||||
end_connection(thd);
|
end_connection(thd);
|
||||||
@ -163,11 +163,13 @@ void threadpool_remove_connection(THD *thd)
|
|||||||
mysql_mutex_unlock(&LOCK_thread_count);
|
mysql_mutex_unlock(&LOCK_thread_count);
|
||||||
mysql_cond_broadcast(&COND_thread_count);
|
mysql_cond_broadcast(&COND_thread_count);
|
||||||
|
|
||||||
/* Free resources (thread_var and PSI connection specific struct)*/
|
/*
|
||||||
|
Free resources associated with this connection:
|
||||||
|
mysys thread_var and PSI thread.
|
||||||
|
*/
|
||||||
my_thread_end();
|
my_thread_end();
|
||||||
|
|
||||||
worker_context.restore();
|
worker_context.restore();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int threadpool_process_request(THD *thd)
|
int threadpool_process_request(THD *thd)
|
||||||
@ -181,8 +183,8 @@ int threadpool_process_request(THD *thd)
|
|||||||
if (thd->killed >= KILL_CONNECTION)
|
if (thd->killed >= KILL_CONNECTION)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
kill flag can be set have been killed by
|
killed flag was set by timeout handler
|
||||||
timeout handler or by a KILL command
|
or KILL command. Return error.
|
||||||
*/
|
*/
|
||||||
worker_context.restore();
|
worker_context.restore();
|
||||||
return 1;
|
return 1;
|
||||||
@ -206,33 +208,18 @@ int threadpool_process_request(THD *thd)
|
|||||||
vio= thd->net.vio;
|
vio= thd->net.vio;
|
||||||
if (!vio->has_data(vio))
|
if (!vio->has_data(vio))
|
||||||
{
|
{
|
||||||
/*
|
/* More info on this debug sync is in sql_parse.cc*/
|
||||||
More info on this debug sync is in sql_parse.cc
|
|
||||||
*/
|
|
||||||
DEBUG_SYNC(thd, "before_do_command_net_read");
|
DEBUG_SYNC(thd, "before_do_command_net_read");
|
||||||
|
thd->net.reading_or_writing= 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!retval)
|
|
||||||
thd->net.reading_or_writing= 1;
|
|
||||||
|
|
||||||
worker_context.restore();
|
worker_context.restore();
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
Scheduler struct, individual functions are implemented
|
|
||||||
in threadpool_unix.cc or threadpool_win.cc
|
|
||||||
*/
|
|
||||||
|
|
||||||
extern bool tp_init();
|
|
||||||
extern void tp_add_connection(THD*);
|
|
||||||
extern void tp_wait_begin(THD *, int);
|
|
||||||
extern void tp_wait_end(THD*);
|
|
||||||
extern void tp_post_kill_notification(THD *thd);
|
|
||||||
extern void tp_end(void);
|
|
||||||
|
|
||||||
static scheduler_functions tp_scheduler_functions=
|
static scheduler_functions tp_scheduler_functions=
|
||||||
{
|
{
|
||||||
0, // max_threads
|
0, // max_threads
|
||||||
@ -255,7 +242,7 @@ void pool_of_threads_scheduler(struct scheduler_functions *func,
|
|||||||
uint *arg_connection_count)
|
uint *arg_connection_count)
|
||||||
{
|
{
|
||||||
*func = tp_scheduler_functions;
|
*func = tp_scheduler_functions;
|
||||||
func->max_threads= *arg_max_connections + 1;
|
func->max_threads= threadpool_max_threads;
|
||||||
func->max_connections= arg_max_connections;
|
func->max_connections= arg_max_connections;
|
||||||
func->connection_count= arg_connection_count;
|
func->connection_count= arg_connection_count;
|
||||||
scheduler_init();
|
scheduler_init();
|
||||||
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user