1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

MDEV-10297 Add priorization to threadpool

Also MDEV-10385 Threadpool refactoring
This commit is contained in:
Vladislav Vaintroub
2016-09-21 14:28:42 +00:00
parent f32a511558
commit f7a7c0c2fe
11 changed files with 960 additions and 666 deletions

View File

@ -1,6 +1,6 @@
--- mysqld--help.result --- mysqld--help.result 2016-09-21 13:50:58.682767100 +0000
+++ mysqld--help,win.reject +++ mysqld--help,win.reject 2016-09-21 13:57:57.494626000 +0000
@@ -321,7 +321,6 @@ @@ -318,7 +318,6 @@
The number of segments in a key cache The number of segments in a key cache
-L, --language=name Client error messages in given language. May be given as -L, --language=name Client error messages in given language. May be given as
a full path. Deprecated. Use --lc-messages-dir instead. a full path. Deprecated. Use --lc-messages-dir instead.
@ -8,7 +8,7 @@
--lc-messages=name Set the language used for the error messages. --lc-messages=name Set the language used for the error messages.
-L, --lc-messages-dir=name -L, --lc-messages-dir=name
Directory where error messages are Directory where error messages are
@@ -520,6 +519,7 @@ @@ -517,6 +516,7 @@
Use MySQL-5.6 (instead of MariaDB-5.3) format for TIME, Use MySQL-5.6 (instead of MariaDB-5.3) format for TIME,
DATETIME, TIMESTAMP columns. DATETIME, TIMESTAMP columns.
(Defaults to on; use --skip-mysql56-temporal-format to disable.) (Defaults to on; use --skip-mysql56-temporal-format to disable.)
@ -16,7 +16,7 @@
--net-buffer-length=# --net-buffer-length=#
Buffer length for TCP/IP and socket communication Buffer length for TCP/IP and socket communication
--net-read-timeout=# --net-read-timeout=#
@@ -927,6 +927,9 @@ @@ -924,6 +924,9 @@
characteristics (isolation level, read only/read characteristics (isolation level, read only/read
write,snapshot - but not any work done / data modified write,snapshot - but not any work done / data modified
within the transaction). within the transaction).
@ -26,7 +26,7 @@
--show-slave-auth-info --show-slave-auth-info
Show user and password in SHOW SLAVE HOSTS on this Show user and password in SHOW SLAVE HOSTS on this
master. master.
@@ -1039,6 +1042,10 @@ @@ -1036,6 +1039,10 @@
Log slow queries to given log file. Defaults logging to Log slow queries to given log file. Defaults logging to
'hostname'-slow.log. Must be enabled to activate other 'hostname'-slow.log. Must be enabled to activate other
slow log options slow log options
@ -37,7 +37,7 @@
--socket=name Socket file to use for connection --socket=name Socket file to use for connection
--sort-buffer-size=# --sort-buffer-size=#
Each thread that needs to do a sort allocates a buffer of Each thread that needs to do a sort allocates a buffer of
@@ -1057,6 +1064,7 @@ @@ -1054,6 +1061,7 @@
NO_ENGINE_SUBSTITUTION, PAD_CHAR_TO_FULL_LENGTH NO_ENGINE_SUBSTITUTION, PAD_CHAR_TO_FULL_LENGTH
--stack-trace Print a symbolic stack trace on failure --stack-trace Print a symbolic stack trace on failure
(Defaults to on; use --skip-stack-trace to disable.) (Defaults to on; use --skip-stack-trace to disable.)
@ -45,35 +45,19 @@
--standards-compliant-cte --standards-compliant-cte
Allow only standards compiant CTE Allow only standards compiant CTE
(Defaults to on; use --skip-standards-compliant-cte to disable.) (Defaults to on; use --skip-standards-compliant-cte to disable.)
@@ -1099,25 +1107,11 @@ @@ -1102,6 +1110,11 @@
--thread-cache-size=#
How many threads we should keep in a cache for reuse.
These are freed after 5 minutes of idle time
- --thread-pool-idle-timeout=#
- Timeout in seconds for an idle thread in the thread
- pool.Worker thread will be shut down after timeout
--thread-pool-max-threads=# --thread-pool-max-threads=#
Maximum allowed number of worker threads in the thread Maximum allowed number of worker threads in the thread
pool pool
- --thread-pool-oversubscribe=#
- How many additional active worker threads in a group are
- allowed.
- --thread-pool-size=#
- Number of thread groups in the pool. This parameter is
- roughly equivalent to maximum number of concurrently
- executing threads (threads in a waiting state do not
- count as executing).
- --thread-pool-stall-limit=#
- Maximum query execution time in milliseconds,before an
- executing non-yielding thread is considered stalled.If a
- worker thread is stalled, additional worker thread may be
- created to handle remaining clients.
+ --thread-pool-min-threads=# + --thread-pool-min-threads=#
+ Minimum number of threads in the thread pool. + Minimum number of threads in the thread pool.
--thread-stack=# The stack size for each thread + --thread-pool-mode=name
--time-format=name The TIME format (ignored) + Chose implementation of the threadpool. One of: windows,
--timed-mutexes Specify whether to time mutexes. Deprecated, has no + generic
@@ -1126,8 +1120,8 @@ --thread-pool-oversubscribe=#
How many additional active worker threads in a group are
allowed.
@@ -1132,8 +1145,8 @@
size, MySQL will automatically convert it to an on-disk size, MySQL will automatically convert it to an on-disk
MyISAM or Aria table MyISAM or Aria table
-t, --tmpdir=name Path for temporary files. Several paths may be specified, -t, --tmpdir=name Path for temporary files. Several paths may be specified,
@ -84,7 +68,7 @@
--transaction-alloc-block-size=# --transaction-alloc-block-size=#
Allocation block size for transactions to be stored in Allocation block size for transactions to be stored in
binary log binary log
@@ -1252,7 +1246,6 @@ @@ -1257,7 +1270,6 @@
key-cache-division-limit 100 key-cache-division-limit 100
key-cache-file-hash-size 512 key-cache-file-hash-size 512
key-cache-segments 0 key-cache-segments 0
@ -92,7 +76,7 @@
lc-messages en_US lc-messages en_US
lc-messages-dir MYSQL_SHAREDIR/ lc-messages-dir MYSQL_SHAREDIR/
lc-time-names en_US lc-time-names en_US
@@ -1319,6 +1312,7 @@ @@ -1324,6 +1336,7 @@
myisam-stats-method NULLS_UNEQUAL myisam-stats-method NULLS_UNEQUAL
myisam-use-mmap FALSE myisam-use-mmap FALSE
mysql56-temporal-format TRUE mysql56-temporal-format TRUE
@ -100,7 +84,7 @@
net-buffer-length 16384 net-buffer-length 16384
net-read-timeout 30 net-read-timeout 30
net-retry-count 10 net-retry-count 10
@@ -1419,6 +1413,8 @@ @@ -1424,6 +1437,8 @@
session-track-state-change FALSE session-track-state-change FALSE
session-track-system-variables session-track-system-variables
session-track-transaction-info OFF session-track-transaction-info OFF
@ -109,7 +93,7 @@
show-slave-auth-info FALSE show-slave-auth-info FALSE
silent-startup FALSE silent-startup FALSE
skip-grant-tables TRUE skip-grant-tables TRUE
@@ -1443,6 +1439,7 @@ @@ -1448,6 +1463,7 @@
slave-type-conversions slave-type-conversions
slow-launch-time 2 slow-launch-time 2
slow-query-log FALSE slow-query-log FALSE
@ -117,7 +101,7 @@
sort-buffer-size 2097152 sort-buffer-size 2097152
sql-mode NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION sql-mode NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
stack-trace TRUE stack-trace TRUE
@@ -1456,16 +1453,14 @@ @@ -1461,14 +1477,16 @@
sync-relay-log 10000 sync-relay-log 10000
sync-relay-log-info 10000 sync-relay-log-info 10000
sysdate-is-now FALSE sysdate-is-now FALSE
@ -129,11 +113,10 @@
table-open-cache-instances 8 table-open-cache-instances 8
tc-heuristic-recover OFF tc-heuristic-recover OFF
thread-cache-size 151 thread-cache-size 151
-thread-pool-idle-timeout 60 thread-pool-idle-timeout 60
thread-pool-max-threads 1000 thread-pool-max-threads 1000
-thread-pool-oversubscribe 3
-thread-pool-stall-limit 500
+thread-pool-min-threads 1 +thread-pool-min-threads 1
thread-stack 297984 +thread-pool-mode windows
time-format %H:%i:%s thread-pool-oversubscribe 3
timed-mutexes FALSE thread-pool-prio-kickup-timer 1000
thread-pool-priority auto

View File

@ -1105,6 +1105,15 @@ The following options may be given as the first argument:
--thread-pool-oversubscribe=# --thread-pool-oversubscribe=#
How many additional active worker threads in a group are How many additional active worker threads in a group are
allowed. allowed.
--thread-pool-prio-kickup-timer=#
The number of milliseconds before a dequeued low-priority
statement is moved to the high-priority queue
--thread-pool-priority=name
Threadpool priority. High priority connections usually
start executing earlier than low priority.If priority set
to 'auto', the the actual priority(low or high) is
determined based on whether or not connection is inside
transaction.
--thread-pool-size=# --thread-pool-size=#
Number of thread groups in the pool. This parameter is Number of thread groups in the pool. This parameter is
roughly equivalent to maximum number of concurrently roughly equivalent to maximum number of concurrently
@ -1461,6 +1470,8 @@ thread-cache-size 151
thread-pool-idle-timeout 60 thread-pool-idle-timeout 60
thread-pool-max-threads 1000 thread-pool-max-threads 1000
thread-pool-oversubscribe 3 thread-pool-oversubscribe 3
thread-pool-prio-kickup-timer 1000
thread-pool-priority auto
thread-pool-stall-limit 500 thread-pool-stall-limit 500
thread-stack 297984 thread-stack 297984
time-format %H:%i:%s time-format %H:%i:%s

View File

@ -4755,6 +4755,34 @@ NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL ENUM_VALUE_LIST NULL
READ_ONLY NO READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME THREAD_POOL_PRIORITY
SESSION_VALUE auto
GLOBAL_VALUE auto
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE auto
VARIABLE_SCOPE SESSION
VARIABLE_TYPE ENUM
VARIABLE_COMMENT Threadpool priority. High priority connections usually start executing earlier than low priority.If priority set to 'auto', the the actual priority(low or high) is determined based on whether or not connection is inside transaction.
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST high,low,auto
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME THREAD_POOL_PRIO_KICKUP_TIMER
SESSION_VALUE NULL
GLOBAL_VALUE 1000
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 1000
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT The number of milliseconds before a dequeued low-priority statement is moved to the high-priority queue
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 4294967295
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME THREAD_POOL_SIZE VARIABLE_NAME THREAD_POOL_SIZE
SESSION_VALUE NULL SESSION_VALUE NULL
GLOBAL_VALUE 4 GLOBAL_VALUE 4

View File

@ -157,9 +157,9 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS) ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
IF(WIN32) IF(WIN32)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
ELSE()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc)
ENDIF() ENDIF()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
ENDIF() ENDIF()
MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY

View File

@ -4425,7 +4425,7 @@ static int init_common_variables()
#endif /* HAVE_SOLARIS_LARGE_PAGES */ #endif /* HAVE_SOLARIS_LARGE_PAGES */
#if defined(HAVE_POOL_OF_THREADS) && !defined(_WIN32) #if defined(HAVE_POOL_OF_THREADS)
if (IS_SYSVAR_AUTOSIZE(&threadpool_size)) if (IS_SYSVAR_AUTOSIZE(&threadpool_size))
SYSVAR_AUTOSIZE(threadpool_size, my_getncpus()); SYSVAR_AUTOSIZE(threadpool_size, my_getncpus());
#endif #endif

View File

@ -697,6 +697,7 @@ typedef struct system_variables
my_bool session_track_schema; my_bool session_track_schema;
my_bool session_track_state_change; my_bool session_track_state_change;
ulong threadpool_priority;
} SV; } SV;
/** /**

View File

@ -3241,23 +3241,17 @@ static Sys_var_ulong Sys_thread_cache_size(
#ifdef HAVE_POOL_OF_THREADS #ifdef HAVE_POOL_OF_THREADS
static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type) static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type)
{ {
#ifdef _WIN32
tp_set_max_threads(threadpool_max_threads); tp_set_max_threads(threadpool_max_threads);
#endif
return false; return false;
} }
#ifdef _WIN32
static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type) static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
{ {
tp_set_min_threads(threadpool_min_threads); tp_set_min_threads(threadpool_min_threads);
return false; return false;
} }
#endif
#ifndef _WIN32
static bool check_threadpool_size(sys_var *self, THD *thd, set_var *var) static bool check_threadpool_size(sys_var *self, THD *thd, set_var *var)
{ {
ulonglong v= var->save_result.ulonglong_value; ulonglong v= var->save_result.ulonglong_value;
@ -3282,7 +3276,6 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type)
tp_set_threadpool_stall_limit(threadpool_stall_limit); tp_set_threadpool_stall_limit(threadpool_stall_limit);
return false; return false;
} }
#endif
#ifdef _WIN32 #ifdef _WIN32
static Sys_var_uint Sys_threadpool_min_threads( static Sys_var_uint Sys_threadpool_min_threads(
@ -3293,7 +3286,24 @@ static Sys_var_uint Sys_threadpool_min_threads(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_min_threads) ON_UPDATE(fix_tp_min_threads)
); );
#else
static const char *threadpool_mode_names[]={ "windows", "generic", 0 };
static Sys_var_enum Sys_threadpool_mode(
"thread_pool_mode",
"Chose implementation of the threadpool",
READ_ONLY GLOBAL_VAR(threadpool_mode), CMD_LINE(REQUIRED_ARG),
threadpool_mode_names, DEFAULT(TP_MODE_WINDOWS)
);
#endif
static const char *threadpool_priority_names[]={ "high", "low", "auto", 0 };
static Sys_var_enum Sys_thread_pool_priority(
"thread_pool_priority",
"Threadpool priority. High priority connections usually start executing earlier than low priority."
"If priority set to 'auto', the the actual priority(low or high) is determined based on whether or not connection is inside transaction.",
SESSION_VAR(threadpool_priority), CMD_LINE(REQUIRED_ARG),
threadpool_priority_names, DEFAULT(TP_PRIORITY_AUTO));
static Sys_var_uint Sys_threadpool_idle_thread_timeout( static Sys_var_uint Sys_threadpool_idle_thread_timeout(
"thread_pool_idle_timeout", "thread_pool_idle_timeout",
"Timeout in seconds for an idle thread in the thread pool." "Timeout in seconds for an idle thread in the thread pool."
@ -3328,7 +3338,7 @@ static Sys_var_uint Sys_threadpool_stall_limit(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_threadpool_stall_limit) ON_UPDATE(fix_threadpool_stall_limit)
); );
#endif /* !WIN32 */
static Sys_var_uint Sys_threadpool_max_threads( static Sys_var_uint Sys_threadpool_max_threads(
"thread_pool_max_threads", "thread_pool_max_threads",
"Maximum allowed number of worker threads in the thread pool", "Maximum allowed number of worker threads in the thread pool",
@ -3337,6 +3347,13 @@ static Sys_var_uint Sys_threadpool_max_threads(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_max_threads) ON_UPDATE(fix_tp_max_threads)
); );
static Sys_var_uint Sys_threadpool_threadpool_prio_kickup_timer(
"thread_pool_prio_kickup_timer",
"The number of milliseconds before a dequeued low-priority statement is moved to the high-priority queue",
GLOBAL_VAR(threadpool_prio_kickup_timer), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, UINT_MAX), DEFAULT(1000), BLOCK_SIZE(1)
);
#endif /* HAVE_POOL_OF_THREADS */ #endif /* HAVE_POOL_OF_THREADS */
/** /**

View File

@ -23,28 +23,19 @@ extern uint threadpool_max_size;
extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/
extern uint threadpool_max_threads; /* Maximum threads in pool */ extern uint threadpool_max_threads; /* Maximum threads in pool */
extern uint threadpool_oversubscribe; /* Maximum active threads in group */ extern uint threadpool_oversubscribe; /* Maximum active threads in group */
extern uint threadpool_prio_kickup_timer; /* Time before low prio item gets prio boost */
#ifdef _WIN32
extern uint threadpool_mode; /* Thread pool implementation , windows or generic */
#define TP_MODE_WINDOWS 0
#define TP_MODE_GENERIC 1
#endif
struct TP_connection;
extern void tp_callback(TP_connection *c);
extern void tp_timeout_handler(TP_connection *c);
/* Common thread pool routines, suitable for different implementations */
extern void threadpool_remove_connection(THD *thd);
extern int threadpool_process_request(THD *thd);
extern THD* threadpool_add_connection(CONNECT *connect, void *scheduled_data);
/*
Functions used by scheduler.
OS-specific implementations are in
threadpool_unix.cc or threadpool_win.cc
*/
extern bool tp_init();
extern void tp_add_connection(CONNECT *);
extern void tp_wait_begin(THD *, int);
extern void tp_wait_end(THD*);
extern void tp_post_kill_notification(THD *thd);
extern void tp_end(void);
/* Used in SHOW for threadpool_idle_thread_count */
extern int tp_get_idle_thread_count();
/* /*
Threadpool statistics Threadpool statistics
@ -63,9 +54,103 @@ extern void tp_set_min_threads(uint val);
extern void tp_set_max_threads(uint val); extern void tp_set_max_threads(uint val);
extern void tp_set_threadpool_size(uint val); extern void tp_set_threadpool_size(uint val);
extern void tp_set_threadpool_stall_limit(uint val); extern void tp_set_threadpool_stall_limit(uint val);
extern int tp_get_idle_thread_count();
extern int tp_get_thread_count();
/* Activate threadpool scheduler */ /* Activate threadpool scheduler */
extern void tp_scheduler(void); extern void tp_scheduler(void);
extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff, extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope); enum enum_var_type scope);
enum TP_PRIORITY {
TP_PRIORITY_HIGH,
TP_PRIORITY_LOW,
TP_PRIORITY_AUTO
};
enum TP_STATE
{
TP_STATE_IDLE,
TP_STATE_RUNNING,
};
/*
Connection structure, encapsulates THD + structures for asynchronous
IO and pool.
Platform specific parts are specified in subclasses called connection_t,
inside threadpool_win.cc and threadpool_unix.cc
*/
struct TP_connection
{
THD* thd;
CONNECT* connect;
TP_STATE state;
TP_PRIORITY priority;
TP_connection(CONNECT *c) :
thd(0),
connect(c),
state(TP_STATE_IDLE),
priority(TP_PRIORITY_HIGH)
{}
virtual ~TP_connection()
{};
/* Initialize io structures windows threadpool, epoll etc */
virtual int init() = 0;
virtual void set_io_timeout(int sec) = 0;
/* Read for the next client command (async) with specified timeout */
virtual int start_io() = 0;
virtual void wait_begin(int type)= 0;
virtual void wait_end() = 0;
};
struct TP_pool
{
virtual ~TP_pool(){};
virtual int init()= 0;
virtual TP_connection *new_connection(CONNECT *)= 0;
virtual void add(TP_connection *c)= 0;
virtual int set_max_threads(uint){ return 0; }
virtual int set_min_threads(uint){ return 0; }
virtual int set_pool_size(uint){ return 0; }
virtual int set_idle_timeout(uint){ return 0; }
virtual int set_oversubscribe(uint){ return 0; }
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
};
#ifdef _WIN32
struct TP_pool_win:TP_pool
{
TP_pool_win();
virtual int init();
virtual ~TP_pool_win();
virtual TP_connection *new_connection(CONNECT *c);
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
};
#endif
struct TP_pool_generic :TP_pool
{
TP_pool_generic();
~TP_pool_generic();
virtual int init();
virtual TP_connection *new_connection(CONNECT *c);
virtual void add(TP_connection *);
virtual int set_pool_size(uint);
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
};

View File

@ -34,14 +34,25 @@ uint threadpool_max_size;
uint threadpool_stall_limit; uint threadpool_stall_limit;
uint threadpool_max_threads; uint threadpool_max_threads;
uint threadpool_oversubscribe; uint threadpool_oversubscribe;
uint threadpool_mode;
uint threadpool_prio_kickup_timer;
/* Stats */ /* Stats */
TP_STATISTICS tp_stats; TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
static int threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
extern bool do_command(THD*); extern bool do_command(THD*);
static inline TP_connection *get_TP_connection(THD *thd)
{
return (TP_connection *)thd->event_scheduler.data;
}
/* /*
Worker threads contexts, and THD contexts. Worker threads contexts, and THD contexts.
========================================= =========================================
@ -105,14 +116,80 @@ static void thread_attach(THD* thd)
#endif #endif
} }
/*
Determine connection priority , using current
transaction state and 'threadpool_priority' variable value.
*/
static TP_PRIORITY get_priority(TP_connection *c)
{
DBUG_ASSERT(c->thd == current_thd);
TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
if (prio == TP_PRIORITY_AUTO)
{
return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
}
return prio;
}
THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
void tp_callback(TP_connection *c)
{
DBUG_ASSERT(c);
Worker_thread_context worker_context;
worker_context.save();
THD *thd= c->thd;
c->state = TP_STATE_RUNNING;
if (!thd)
{
/* No THD, need to login first. */
DBUG_ASSERT(c->connect);
thd= c->thd= threadpool_add_connection(c->connect, c);
if (!thd)
{
/* Bail out on connect error.*/
goto error;
}
c->connect= 0;
}
else if (threadpool_process_request(thd))
{
/* QUIT or an error occured. */
goto error;
}
/* Set priority */
c->priority= get_priority(c);
/* Read next command from client. */
c->set_io_timeout(thd->variables.net_wait_timeout);
c->state= TP_STATE_IDLE;
if (c->start_io())
goto error;
worker_context.restore();
return;
error:
c->thd= 0;
delete c;
if (thd)
{
threadpool_remove_connection(thd);
}
worker_context.restore();
}
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
{ {
THD *thd= NULL; THD *thd= NULL;
int error=1; int error=1;
Worker_thread_context worker_context;
worker_context.save();
/* /*
Create a new connection context: mysys_thread_var and PSI thread Create a new connection context: mysys_thread_var and PSI thread
@ -137,7 +214,6 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
#endif #endif
my_thread_end(); my_thread_end();
} }
worker_context.restore();
return NULL; return NULL;
} }
delete connect; delete connect;
@ -184,17 +260,14 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
threadpool_remove_connection(thd); threadpool_remove_connection(thd);
thd= NULL; thd= NULL;
} }
worker_context.restore();
return thd; return thd;
} }
void threadpool_remove_connection(THD *thd) static void threadpool_remove_connection(THD *thd)
{ {
Worker_thread_context worker_context;
worker_context.save();
thread_attach(thd); thread_attach(thd);
thd->event_scheduler.data= 0;
thd->net.reading_or_writing = 0; thd->net.reading_or_writing = 0;
end_connection(thd); end_connection(thd);
close_connection(thd, 0); close_connection(thd, 0);
@ -206,19 +279,14 @@ void threadpool_remove_connection(THD *thd)
mysys thread_var and PSI thread. mysys thread_var and PSI thread.
*/ */
my_thread_end(); my_thread_end();
worker_context.restore();
} }
/** /**
Process a single client request or a single batch. Process a single client request or a single batch.
*/ */
int threadpool_process_request(THD *thd) static int threadpool_process_request(THD *thd)
{ {
int retval= 0; int retval= 0;
Worker_thread_context worker_context;
worker_context.save();
thread_attach(thd); thread_attach(thd);
if (thd->killed >= KILL_CONNECTION) if (thd->killed >= KILL_CONNECTION)
@ -268,7 +336,6 @@ int threadpool_process_request(THD *thd)
} }
end: end:
worker_context.restore();
return retval; return retval;
} }
@ -286,6 +353,119 @@ static bool tp_end_thread(THD *, bool)
return 0; return 0;
} }
static TP_pool *pool;
static bool tp_init()
{
#ifdef _WIN32
if (threadpool_mode == TP_MODE_WINDOWS)
pool= new (std::nothrow) TP_pool_win;
else
pool= new (std::nothrow) TP_pool_generic;
#else
pool= new (std::nothrow) TP_pool_generic;
#endif
if (!pool)
return true;
if (pool->init())
{
delete pool;
pool= 0;
return true;
}
return false;
}
static void tp_add_connection(CONNECT *connect)
{
TP_connection *c= pool->new_connection(connect);
DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
if (c)
pool->add(c);
else
connect->close_and_delete();
}
int tp_get_idle_thread_count()
{
return pool? pool->get_idle_thread_count(): 0;
}
int tp_get_thread_count()
{
return pool ? pool->get_thread_count() : 0;
}
void tp_set_min_threads(uint val)
{
if (pool)
pool->set_min_threads(val);
}
void tp_set_max_threads(uint val)
{
if (pool)
pool->set_max_threads(val);
}
void tp_set_threadpool_size(uint val)
{
if (pool)
pool->set_pool_size(val);
}
void tp_set_threadpool_stall_limit(uint val)
{
if (pool)
pool->set_stall_limit(val);
}
void tp_timeout_handler(TP_connection *c)
{
if (c->state != TP_STATE_IDLE)
return;
THD *thd=c->thd;
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->killed= KILL_CONNECTION;
c->priority= TP_PRIORITY_HIGH;
post_kill_notification(thd);
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
static void tp_wait_begin(THD *thd, int type)
{
TP_connection *c = get_TP_connection(thd);
if (c)
c->wait_begin(type);
}
static void tp_wait_end(THD *thd)
{
TP_connection *c = get_TP_connection(thd);
if (c)
c->wait_end();
}
static void tp_end()
{
delete pool;
}
static void tp_post_kill_notification(THD *thd)
{
TP_connection *c= get_TP_connection(thd);
if (c)
c->priority= TP_PRIORITY_HIGH;
post_kill_notification(thd);
}
static scheduler_functions tp_scheduler_functions= static scheduler_functions tp_scheduler_functions=
{ {
0, // max_threads 0, // max_threads
@ -296,7 +476,7 @@ static scheduler_functions tp_scheduler_functions=
tp_add_connection, // add_connection tp_add_connection, // add_connection
tp_wait_begin, // thd_wait_begin tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end tp_wait_end, // thd_wait_end
post_kill_notification, // post_kill_notification tp_post_kill_notification, // post kill notification
tp_end_thread, // Dummy function tp_end_thread, // Dummy function
tp_end // end tp_end // end
}; };

View File

@ -22,6 +22,17 @@
#ifdef HAVE_POOL_OF_THREADS #ifdef HAVE_POOL_OF_THREADS
#ifdef _WIN32
/* AIX may define this, too ?*/
#define HAVE_IOCP
#endif
#ifdef HAVE_IOCP
#define OPTIONAL_IO_POLL_READ_PARAM &overlapped
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
#endif
#include <sql_connect.h> #include <sql_connect.h>
#include <mysqld.h> #include <mysqld.h>
#include <debug_sync.h> #include <debug_sync.h>
@ -38,10 +49,23 @@ typedef struct kevent native_event;
#elif defined (__sun) #elif defined (__sun)
#include <port.h> #include <port.h>
typedef port_event_t native_event; typedef port_event_t native_event;
#elif defined (HAVE_IOCP)
typedef OVERLAPPED_ENTRY native_event;
#else #else
#error threadpool is not available on this platform #error threadpool is not available on this platform
#endif #endif
static void io_poll_close(int fd)
{
#ifdef _WIN32
CloseHandle((HANDLE)fd);
#else
close(fd);
#endif
}
/** Maximum number of native events a listener can read in one go */ /** Maximum number of native events a listener can read in one go */
#define MAX_EVENTS 1024 #define MAX_EVENTS 1024
@ -108,32 +132,45 @@ typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
> >
worker_list_t; worker_list_t;
struct connection_t struct TP_connection_generic:public TP_connection
{ {
TP_connection_generic(CONNECT *c);
~TP_connection_generic();
virtual int init(){ return 0; };
virtual void set_io_timeout(int sec);
virtual int start_io();
virtual void wait_begin(int type);
virtual void wait_end();
THD *thd;
thread_group_t *thread_group; thread_group_t *thread_group;
connection_t *next_in_queue; TP_connection_generic *next_in_queue;
connection_t **prev_in_queue; TP_connection_generic **prev_in_queue;
ulonglong abs_wait_timeout; ulonglong abs_wait_timeout;
CONNECT* connect; ulonglong dequeue_time;
bool logged_in;
bool bound_to_poll_descriptor; bool bound_to_poll_descriptor;
bool waiting; int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
}; };
typedef I_P_List<connection_t, typedef TP_connection_generic TP_connection_generic;
I_P_List_adapter<connection_t,
&connection_t::next_in_queue, typedef I_P_List<TP_connection_generic,
&connection_t::prev_in_queue>, I_P_List_adapter<TP_connection_generic,
&TP_connection_generic::next_in_queue,
&TP_connection_generic::prev_in_queue>,
I_P_List_null_counter, I_P_List_null_counter,
I_P_List_fast_push_back<connection_t> > I_P_List_fast_push_back<TP_connection_generic> >
connection_queue_t; connection_queue_t;
const int NQUEUES=2; /* We have high and low priority queues*/
struct thread_group_t struct thread_group_t
{ {
mysql_mutex_t mutex; mysql_mutex_t mutex;
connection_queue_t queue; connection_queue_t queues[NQUEUES];
worker_list_t waiting_threads; worker_list_t waiting_threads;
worker_thread_t *listener; worker_thread_t *listener;
pthread_attr_t *pthread_attr; pthread_attr_t *pthread_attr;
@ -148,8 +185,7 @@ struct thread_group_t
int shutdown_pipe[2]; int shutdown_pipe[2];
bool shutdown; bool shutdown;
bool stalled; bool stalled;
} MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE);
} MY_ALIGNED(512);
static thread_group_t *all_groups; static thread_group_t *all_groups;
static uint group_count; static uint group_count;
@ -175,15 +211,13 @@ struct pool_timer_t
static pool_timer_t pool_timer; static pool_timer_t pool_timer;
static void queue_put(thread_group_t *thread_group, connection_t *connection); static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
static int wake_thread(thread_group_t *thread_group); static int wake_thread(thread_group_t *thread_group);
static void handle_event(connection_t *connection);
static int wake_or_create_thread(thread_group_t *thread_group); static int wake_or_create_thread(thread_group_t *thread_group);
static int create_worker(thread_group_t *thread_group); static int create_worker(thread_group_t *thread_group);
static void *worker_main(void *param); static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group); static void check_stall(thread_group_t *thread_group);
static void connection_abort(connection_t *connection);
static void set_wait_timeout(connection_t *connection);
static void set_next_timeout_check(ulonglong abstime); static void set_next_timeout_check(ulonglong abstime);
static void print_pool_blocked_message(bool); static void print_pool_blocked_message(bool);
@ -194,12 +228,12 @@ static void print_pool_blocked_message(bool);
This maps to different APIs on different Unixes. This maps to different APIs on different Unixes.
Supported are currently Linux with epoll, Solaris with event ports, Supported are currently Linux with epoll, Solaris with event ports,
OSX and BSD with kevent. All those API's are used with one-shot flags OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags
(the event is signalled once client has written something into the socket, (the event is signalled once client has written something into the socket,
then socket is removed from the "poll-set" until the command is finished, then socket is removed from the "poll-set" until the command is finished,
and we need to re-arm/re-register socket) and we need to re-arm/re-register socket)
No implementation for poll/select/AIO is currently provided. No implementation for poll/select is currently provided.
The API closely resembles all of the above mentioned platform APIs The API closely resembles all of the above mentioned platform APIs
and consists of following functions. and consists of following functions.
@ -208,7 +242,7 @@ static void print_pool_blocked_message(bool);
Creates an io_poll descriptor Creates an io_poll descriptor
On Linux: epoll_create() On Linux: epoll_create()
- io_poll_associate_fd(int poll_fd, int fd, void *data) - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt)
Associate file descriptor with io poll descriptor Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD)) On Linux : epoll_ctl(..EPOLL_CTL_ADD))
@ -217,7 +251,7 @@ static void print_pool_blocked_message(bool);
On Linux: epoll_ctl(..EPOLL_CTL_DEL) On Linux: epoll_ctl(..EPOLL_CTL_DEL)
- io_poll_start_read(int poll_fd,int fd, void *data) - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
The same as io_poll_associate_fd(), but cannot be used before The same as io_poll_associate_fd(), but cannot be used before
io_poll_associate_fd() was called. io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD) On Linux : epoll_ctl(..EPOLL_CTL_MOD)
@ -245,7 +279,7 @@ static int io_poll_create()
} }
int io_poll_associate_fd(int pollfd, int fd, void *data) int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
{ {
struct epoll_event ev; struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */ ev.data.u64= 0; /* Keep valgrind happy */
@ -256,7 +290,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data)
int io_poll_start_read(int pollfd, int fd, void *data) int io_poll_start_read(int pollfd, int fd, void *data, void *)
{ {
struct epoll_event ev; struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */ ev.data.u64= 0; /* Keep valgrind happy */
@ -315,7 +349,7 @@ int io_poll_create()
return kqueue(); return kqueue();
} }
int io_poll_start_read(int pollfd, int fd, void *data) int io_poll_start_read(int pollfd, int fd, void *data,void *)
{ {
struct kevent ke; struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@ -324,12 +358,12 @@ int io_poll_start_read(int pollfd, int fd, void *data)
} }
int io_poll_associate_fd(int pollfd, int fd, void *data) int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
{ {
struct kevent ke; struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
0, 0, data); 0, 0, data);
return io_poll_start_read(pollfd,fd, data); return io_poll_start_read(pollfd,fd, data, 0);
} }
@ -371,14 +405,14 @@ static int io_poll_create()
return port_create(); return port_create();
} }
int io_poll_start_read(int pollfd, int fd, void *data) int io_poll_start_read(int pollfd, int fd, void *data, void *)
{ {
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
} }
static int io_poll_associate_fd(int pollfd, int fd, void *data) static int io_poll_associate_fd(int pollfd, int fd, void *data, void *)
{ {
return io_poll_start_read(pollfd, fd, data); return io_poll_start_read(pollfd, fd, data, 0);
} }
int io_poll_disassociate_fd(int pollfd, int fd) int io_poll_disassociate_fd(int pollfd, int fd)
@ -410,23 +444,115 @@ static void* native_event_get_userdata(native_event *event)
{ {
return event->portev_user; return event->portev_user;
} }
#elif defined(HAVE_IOCP)
static int io_poll_create()
{
HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
return (int)h;
}
int io_poll_start_read(int pollfd, int fd, void *, void *opt)
{
DWORD num_bytes = 0;
static char c;
WSABUF buf;
buf.buf= &c;
buf.len= 0;
DWORD flags=0;
if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0)
return 0;
if (GetLastError() == ERROR_IO_PENDING)
return 0;
return 1;
}
static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt)
{
HANDLE h= CreateIoCompletionPort((HANDLE)fd, (HANDLE)pollfd, (ULONG_PTR)data, 0);
if (!h)
return -1;
return io_poll_start_read(pollfd,fd, 0, opt);
}
int io_poll_disassociate_fd(int pollfd, int fd)
{
/* Not possible to unbind/rebind file descriptor in IOCP. */
return 0;
}
int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
{
ULONG n;
BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events,
maxevents, &n, timeout_ms, FALSE);
return ok ? (int)n : -1;
}
static void* native_event_get_userdata(native_event *event)
{
return (void *)event->lpCompletionKey;
}
#endif #endif
/* Dequeue element from a workqueue */ /* Dequeue element from a workqueue */
static connection_t *queue_get(thread_group_t *thread_group) static TP_connection_generic *queue_get(thread_group_t *thread_group)
{ {
DBUG_ENTER("queue_get"); DBUG_ENTER("queue_get");
thread_group->queue_event_count++; thread_group->queue_event_count++;
connection_t *c= thread_group->queue.front(); TP_connection_generic *c;
if (c) for (int i=0; i < NQUEUES;i++)
{ {
thread_group->queue.remove(c); c= thread_group->queues[i].pop_front();
} if (c)
DBUG_RETURN(c); DBUG_RETURN(c);
} }
DBUG_RETURN(0);
}
static bool is_queue_empty(thread_group_t *thread_group)
{
for (int i=0; i < NQUEUES; i++)
{
if (!thread_group->queues[i].is_empty())
return false;
}
return true;
}
static void queue_init(thread_group_t *thread_group)
{
for (int i=0; i < NQUEUES; i++)
{
thread_group->queues[i].empty();
}
}
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
{
ulonglong now= pool_timer.current_microtime;
for(int i=0; i < cnt; i++)
{
TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
c->dequeue_time= now;
thread_group->queues[c->priority].push_back(c);
}
}
/* /*
Handle wait timeout : Handle wait timeout :
@ -450,7 +576,7 @@ static void timeout_check(pool_timer_t *timer)
if (thd->net.reading_or_writing != 1) if (thd->net.reading_or_writing != 1)
continue; continue;
connection_t *connection= (connection_t *)thd->event_scheduler.data; TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
if (!connection) if (!connection)
{ {
/* /*
@ -462,11 +588,7 @@ static void timeout_check(pool_timer_t *timer)
if(connection->abs_wait_timeout < timer->current_microtime) if(connection->abs_wait_timeout < timer->current_microtime)
{ {
/* Wait timeout exceeded, kill connection. */ tp_timeout_handler(connection);
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->killed = KILL_CONNECTION;
post_kill_notification(thd);
mysql_mutex_unlock(&thd->LOCK_thd_data);
} }
else else
{ {
@ -545,10 +667,23 @@ static void* timer_thread(void *param)
void check_stall(thread_group_t *thread_group) void check_stall(thread_group_t *thread_group)
{ {
if (mysql_mutex_trylock(&thread_group->mutex) != 0) mysql_mutex_lock(&thread_group->mutex);
/*
Bump priority for the low priority connections that spent too much
time in low prio queue.
*/
TP_connection_generic *c;
for (;;)
{ {
/* Something happens. Don't disturb */ c= thread_group->queues[TP_PRIORITY_LOW].front();
return; if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
{
thread_group->queues[TP_PRIORITY_LOW].remove(c);
thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
}
else
break;
} }
/* /*
@ -593,7 +728,7 @@ void check_stall(thread_group_t *thread_group)
do wait and indicate that via thd_wait_begin/end callbacks, thread creation do wait and indicate that via thd_wait_begin/end callbacks, thread creation
will be faster. will be faster.
*/ */
if (!thread_group->queue.is_empty() && !thread_group->queue_event_count) if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
{ {
thread_group->stalled= true; thread_group->stalled= true;
wake_or_create_thread(thread_group); wake_or_create_thread(thread_group);
@ -636,11 +771,11 @@ static void stop_timer(pool_timer_t *timer)
@return a ready connection, or NULL on shutdown @return a ready connection, or NULL on shutdown
*/ */
static connection_t * listener(worker_thread_t *current_thread, static TP_connection_generic * listener(worker_thread_t *current_thread,
thread_group_t *thread_group) thread_group_t *thread_group)
{ {
DBUG_ENTER("listener"); DBUG_ENTER("listener");
connection_t *retval= NULL; TP_connection_generic *retval= NULL;
for(;;) for(;;)
{ {
@ -707,28 +842,17 @@ static connection_t * listener(worker_thread_t *current_thread,
and wake a worker. and wake a worker.
NOTE: Currently nothing is done to detect or prevent long queuing times. NOTE: Currently nothing is done to detect or prevent long queuing times.
A solutionc for the future would be to give up "one active thread per A solution for the future would be to give up "one active thread per
group" principle, if events stay in the queue for too long, and just wake group" principle, if events stay in the queue for too long, and just wake
more workers. more workers.
*/ */
bool listener_picks_event= thread_group->queue.is_empty(); bool listener_picks_event=is_queue_empty(thread_group);
queue_put(thread_group, ev, cnt);
/*
If listener_picks_event is set, listener thread will handle first event,
and put the rest into the queue. If listener_pick_event is not set, all
events go to the queue.
*/
for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
{
connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
thread_group->queue.push_back(c);
}
if (listener_picks_event) if (listener_picks_event)
{ {
/* Handle the first event. */ /* Handle the first event. */
retval= (connection_t *)native_event_get_userdata(&ev[0]); retval= queue_get(thread_group);
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
break; break;
} }
@ -914,7 +1038,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
thread_group->pollfd= -1; thread_group->pollfd= -1;
thread_group->shutdown_pipe[0]= -1; thread_group->shutdown_pipe[0]= -1;
thread_group->shutdown_pipe[1]= -1; thread_group->shutdown_pipe[1]= -1;
thread_group->queue.empty(); queue_init(thread_group);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
@ -924,9 +1048,10 @@ void thread_group_destroy(thread_group_t *thread_group)
mysql_mutex_destroy(&thread_group->mutex); mysql_mutex_destroy(&thread_group->mutex);
if (thread_group->pollfd != -1) if (thread_group->pollfd != -1)
{ {
close(thread_group->pollfd); io_poll_close(thread_group->pollfd);
thread_group->pollfd= -1; thread_group->pollfd= -1;
} }
#ifndef HAVE_IOCP
for(int i=0; i < 2; i++) for(int i=0; i < 2; i++)
{ {
if(thread_group->shutdown_pipe[i] != -1) if(thread_group->shutdown_pipe[i] != -1)
@ -935,6 +1060,8 @@ void thread_group_destroy(thread_group_t *thread_group)
thread_group->shutdown_pipe[i]= -1; thread_group->shutdown_pipe[i]= -1;
} }
} }
#endif
if (my_atomic_add32(&shutdown_group_count, -1) == 1) if (my_atomic_add32(&shutdown_group_count, -1) == 1)
my_free(all_groups); my_free(all_groups);
} }
@ -957,7 +1084,32 @@ static int wake_thread(thread_group_t *thread_group)
DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
} }
/*
Wake listener thread (during shutdown)
Self-pipe trick is used in most cases,except IOCP.
*/
static int wake_listener(thread_group_t *thread_group)
{
#ifndef HAVE_IOCP
if (pipe(thread_group->shutdown_pipe))
{
return -1;
}
/* Wake listener */
if (io_poll_associate_fd(thread_group->pollfd,
thread_group->shutdown_pipe[0], NULL, NULL))
{
return -1;
}
char c= 0;
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
return -1;
#else
PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0);
#endif
return 0;
}
/** /**
Initiate shutdown for thread group. Initiate shutdown for thread group.
@ -981,20 +1133,7 @@ static void thread_group_close(thread_group_t *thread_group)
thread_group->shutdown= true; thread_group->shutdown= true;
thread_group->listener= NULL; thread_group->listener= NULL;
if (pipe(thread_group->shutdown_pipe)) wake_listener(thread_group);
{
DBUG_VOID_RETURN;
}
/* Wake listener */
if (io_poll_associate_fd(thread_group->pollfd,
thread_group->shutdown_pipe[0], NULL))
{
DBUG_VOID_RETURN;
}
char c= 0;
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
DBUG_VOID_RETURN;
/* Wake all workers. */ /* Wake all workers. */
while(wake_thread(thread_group) == 0) while(wake_thread(thread_group) == 0)
@ -1015,18 +1154,16 @@ static void thread_group_close(thread_group_t *thread_group)
*/ */
static void queue_put(thread_group_t *thread_group, connection_t *connection) static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection)
{ {
DBUG_ENTER("queue_put"); DBUG_ENTER("queue_put");
mysql_mutex_lock(&thread_group->mutex); connection->dequeue_time= pool_timer.current_microtime;
thread_group->queue.push_back(connection); thread_group->queues[connection->priority].push_back(connection);
if (thread_group->active_thread_count == 0) if (thread_group->active_thread_count == 0)
wake_or_create_thread(thread_group); wake_or_create_thread(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@ -1061,18 +1198,19 @@ static bool too_many_threads(thread_group_t *thread_group)
NULL is returned if timeout has expired,or on shutdown. NULL is returned if timeout has expired,or on shutdown.
*/ */
connection_t *get_event(worker_thread_t *current_thread, TP_connection_generic *get_event(worker_thread_t *current_thread,
thread_group_t *thread_group, struct timespec *abstime) thread_group_t *thread_group, struct timespec *abstime)
{ {
DBUG_ENTER("get_event"); DBUG_ENTER("get_event");
connection_t *connection = NULL; TP_connection_generic *connection = NULL;
int err=0;
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
DBUG_ASSERT(thread_group->active_thread_count >= 0); DBUG_ASSERT(thread_group->active_thread_count >= 0);
for(;;) for(;;)
{ {
int err=0;
bool oversubscribed = too_many_threads(thread_group); bool oversubscribed = too_many_threads(thread_group);
if (thread_group->shutdown) if (thread_group->shutdown)
break; break;
@ -1101,21 +1239,26 @@ connection_t *get_event(worker_thread_t *current_thread,
break; break;
} }
/* /*
Last thing we try before going to sleep is to Last thing we try before going to sleep is to
pick a single event via epoll, without waiting (timeout 0) non-blocking event poll, i.e with timeout = 0.
If this returns events, pick one
*/ */
if (!oversubscribed) if (!oversubscribed)
{ {
native_event nev;
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1) native_event ev[MAX_EVENTS];
int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
if (cnt > 0)
{ {
thread_group->io_event_count++; queue_put(thread_group, ev, cnt);
connection = (connection_t *)native_event_get_userdata(&nev); connection= queue_get(thread_group);
break; break;
} }
} }
/* And now, finally sleep */ /* And now, finally sleep */
current_thread->woken = false; /* wake() sets this to true */ current_thread->woken = false; /* wake() sets this to true */
@ -1175,7 +1318,7 @@ void wait_begin(thread_group_t *thread_group)
DBUG_ASSERT(thread_group->connection_count > 0); DBUG_ASSERT(thread_group->connection_count > 0);
if ((thread_group->active_thread_count == 0) && if ((thread_group->active_thread_count == 0) &&
(thread_group->queue.is_empty() || !thread_group->listener)) (is_queue_empty(thread_group) || !thread_group->listener))
{ {
/* /*
Group might stall while this thread waits, thus wake Group might stall while this thread waits, thus wake
@ -1202,103 +1345,47 @@ void wait_end(thread_group_t *thread_group)
} }
/**
Allocate/initialize a new connection structure.
*/
connection_t *alloc_connection()
TP_connection * TP_pool_generic::new_connection(CONNECT *c)
{ {
connection_t* connection; return new (std::nothrow) TP_connection_generic(c);
DBUG_ENTER("alloc_connection");
DBUG_EXECUTE_IF("simulate_failed_connection_1", DBUG_RETURN(0); );
if ((connection = (connection_t *)my_malloc(sizeof(connection_t),0)))
{
connection->waiting= false;
connection->logged_in= false;
connection->bound_to_poll_descriptor= false;
connection->abs_wait_timeout= ULONGLONG_MAX;
connection->thd= 0;
} }
DBUG_RETURN(connection);
}
/** /**
Add a new connection to thread pool.. Add a new connection to thread pool..
*/ */
void tp_add_connection(CONNECT *connect) void TP_pool_generic::add(TP_connection *c)
{ {
connection_t *connection;
DBUG_ENTER("tp_add_connection"); DBUG_ENTER("tp_add_connection");
connection= alloc_connection(); TP_connection_generic *connection=(TP_connection_generic *)c;
if (!connection) thread_group_t *thread_group= connection->thread_group;
{
connect->close_and_delete();
DBUG_VOID_RETURN;
}
connection->connect= connect;
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[connect->thread_id%group_count];
connection->thread_group=group;
mysql_mutex_lock(&group->mutex);
group->connection_count++;
mysql_mutex_unlock(&group->mutex);
/* /*
Add connection to the work queue.Actual logon Add connection to the work queue.Actual logon
will be done by a worker thread. will be done by a worker thread.
*/ */
queue_put(group, connection); mysql_mutex_lock(&thread_group->mutex);
queue_put(thread_group, connection);
mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/**
Terminate connection.
*/
static void connection_abort(connection_t *connection)
{
DBUG_ENTER("connection_abort");
thread_group_t *group= connection->thread_group;
if (connection->thd)
{
threadpool_remove_connection(connection->thd);
}
mysql_mutex_lock(&group->mutex);
group->connection_count--;
mysql_mutex_unlock(&group->mutex);
my_free(connection);
DBUG_VOID_RETURN;
}
/** /**
MySQL scheduler callback: wait begin MySQL scheduler callback: wait begin
*/ */
void tp_wait_begin(THD *thd, int type) void TP_connection_generic::wait_begin(int type)
{ {
DBUG_ENTER("tp_wait_begin"); DBUG_ENTER("wait_begin");
DBUG_ASSERT(thd);
connection_t *connection = (connection_t *)thd->event_scheduler.data; DBUG_ASSERT(!waiting);
if (connection) waiting++;
{ if (waiting == 1)
DBUG_ASSERT(!connection->waiting); ::wait_begin(thread_group);
connection->waiting= true;
wait_begin(connection->thread_group);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@ -1307,18 +1394,13 @@ void tp_wait_begin(THD *thd, int type)
MySQL scheduler callback: wait end MySQL scheduler callback: wait end
*/ */
void tp_wait_end(THD *thd) void TP_connection_generic::wait_end()
{ {
DBUG_ENTER("tp_wait_end"); DBUG_ENTER("wait_end");
DBUG_ASSERT(thd); DBUG_ASSERT(waiting);
waiting--;
connection_t *connection = (connection_t *)thd->event_scheduler.data; if (waiting == 0)
if (connection) ::wait_end(thread_group);
{
DBUG_ASSERT(connection->waiting);
connection->waiting = false;
wait_end(connection->thread_group);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@ -1335,12 +1417,41 @@ static void set_next_timeout_check(ulonglong abstime)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
TP_connection_generic::TP_connection_generic(CONNECT *c):
TP_connection(c),
thread_group(0),
next_in_queue(0),
prev_in_queue(0),
abs_wait_timeout(ULONGLONG_MAX),
bound_to_poll_descriptor(false),
waiting(false)
#ifdef HAVE_IOCP
, overlapped()
#endif
{
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[c->thread_id%group_count];
thread_group=group;
mysql_mutex_lock(&group->mutex);
group->connection_count++;
mysql_mutex_unlock(&group->mutex);
}
TP_connection_generic::~TP_connection_generic()
{
mysql_mutex_lock(&thread_group->mutex);
thread_group->connection_count--;
mysql_mutex_unlock(&thread_group->mutex);
}
/** /**
Set wait timeout for connection. Set wait timeout for connection.
*/ */
static void set_wait_timeout(connection_t *c) void TP_connection_generic::set_io_timeout(int timeout_sec)
{ {
DBUG_ENTER("set_wait_timeout"); DBUG_ENTER("set_wait_timeout");
/* /*
@ -1351,11 +1462,11 @@ static void set_wait_timeout(connection_t *c)
one tick interval. one tick interval.
*/ */
c->abs_wait_timeout= pool_timer.current_microtime + abs_wait_timeout= pool_timer.current_microtime +
1000LL*pool_timer.tick_interval + 1000LL*pool_timer.tick_interval +
1000000LL*c->thd->variables.net_wait_timeout; 1000000LL*timeout_sec;
set_next_timeout_check(c->abs_wait_timeout); set_next_timeout_check(abs_wait_timeout);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@ -1367,7 +1478,7 @@ static void set_wait_timeout(connection_t *c)
after thread_pool_size setting. after thread_pool_size setting.
*/ */
static int change_group(connection_t *c, static int change_group(TP_connection_generic *c,
thread_group_t *old_group, thread_group_t *old_group,
thread_group_t *new_group) thread_group_t *new_group)
{ {
@ -1398,10 +1509,11 @@ static int change_group(connection_t *c,
} }
static int start_io(connection_t *connection) int TP_connection_generic::start_io()
{ {
int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket); int fd= mysql_socket_getfd(thd->net.vio->mysql_socket);
#ifndef HAVE_IOCP
/* /*
Usually, connection will stay in the same group for the entire Usually, connection will stay in the same group for the entire
connection's life. However, we do allow group_count to connection's life. However, we do allow group_count to
@ -1413,56 +1525,25 @@ static int start_io(connection_t *connection)
on thread_id and current group count, and migrate if necessary. on thread_id and current group count, and migrate if necessary.
*/ */
thread_group_t *group = thread_group_t *group =
&all_groups[connection->thd->thread_id%group_count]; &all_groups[thd->thread_id%group_count];
if (group != connection->thread_group) if (group != thread_group)
{ {
if (change_group(connection, connection->thread_group, group)) if (change_group(this, thread_group, group))
return -1; return -1;
} }
#endif
/* /*
Bind to poll descriptor if not yet done. Bind to poll descriptor if not yet done.
*/ */
if (!connection->bound_to_poll_descriptor) if (!bound_to_poll_descriptor)
{ {
connection->bound_to_poll_descriptor= true; bound_to_poll_descriptor= true;
return io_poll_associate_fd(group->pollfd, fd, connection); return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
} }
return io_poll_start_read(group->pollfd, fd, connection); return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
}
static void handle_event(connection_t *connection)
{
DBUG_ENTER("handle_event");
int err;
if (!connection->logged_in)
{
connection->thd = threadpool_add_connection(connection->connect, connection);
err= (connection->thd == NULL);
connection->logged_in= true;
}
else
{
err= threadpool_process_request(connection->thd);
}
if(err)
goto end;
set_wait_timeout(connection);
err= start_io(connection);
end:
if (err)
connection_abort(connection);
DBUG_VOID_RETURN;
} }
@ -1490,14 +1571,14 @@ static void *worker_main(void *param)
/* Run event loop */ /* Run event loop */
for(;;) for(;;)
{ {
connection_t *connection; TP_connection_generic *connection;
struct timespec ts; struct timespec ts;
set_timespec(ts,threadpool_idle_timeout); set_timespec(ts,threadpool_idle_timeout);
connection = get_event(&this_thread, thread_group, &ts); connection = get_event(&this_thread, thread_group, &ts);
if (!connection) if (!connection)
break; break;
this_thread.event_count++; this_thread.event_count++;
handle_event(connection); tp_callback(connection);
} }
/* Thread shutdown: cleanup per-worker-thread structure. */ /* Thread shutdown: cleanup per-worker-thread structure. */
@ -1518,30 +1599,33 @@ static void *worker_main(void *param)
} }
bool tp_init() TP_pool_generic::TP_pool_generic()
{}
int TP_pool_generic::init()
{ {
DBUG_ENTER("tp_init"); DBUG_ENTER("TP_pool_generic::TP_pool_generic");
threadpool_max_size= MY_MAX(threadpool_size, 128); threadpool_max_size= MY_MAX(threadpool_size, 128);
all_groups= (thread_group_t *) all_groups= (thread_group_t *)
my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL)); my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
if (!all_groups) if (!all_groups)
{ {
threadpool_max_size= 0; threadpool_max_size= 0;
DBUG_RETURN(1); sql_print_error("Allocation failed");
DBUG_RETURN(-1);
} }
threadpool_started= true;
scheduler_init(); scheduler_init();
threadpool_started= true;
for (uint i= 0; i < threadpool_max_size; i++) for (uint i= 0; i < threadpool_max_size; i++)
{ {
thread_group_init(&all_groups[i], get_connection_attrib()); thread_group_init(&all_groups[i], get_connection_attrib());
} }
tp_set_threadpool_size(threadpool_size); set_pool_size(threadpool_size);
if(group_count == 0) if(group_count == 0)
{ {
/* Something went wrong */ /* Something went wrong */
sql_print_error("Can't set threadpool size to %d",threadpool_size); sql_print_error("Can't set threadpool size to %d",threadpool_size);
DBUG_RETURN(1); DBUG_RETURN(-1);
} }
PSI_register(mutex); PSI_register(mutex);
PSI_register(cond); PSI_register(cond);
@ -1552,7 +1636,7 @@ bool tp_init()
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void tp_end() TP_pool_generic::~TP_pool_generic()
{ {
DBUG_ENTER("tp_end"); DBUG_ENTER("tp_end");
@ -1571,12 +1655,9 @@ void tp_end()
/** Ensure that poll descriptors are created when threadpool_size changes */ /** Ensure that poll descriptors are created when threadpool_size changes */
int TP_pool_generic::set_pool_size(uint size)
void tp_set_threadpool_size(uint size)
{ {
bool success= true; bool success= true;
if (!threadpool_started)
return;
for(uint i=0; i< size; i++) for(uint i=0; i< size; i++)
{ {
@ -1596,20 +1677,20 @@ void tp_set_threadpool_size(uint size)
if (!success) if (!success)
{ {
group_count= i; group_count= i;
return; return -1;
} }
} }
group_count= size; group_count= size;
return 0;
} }
void tp_set_threadpool_stall_limit(uint limit) int TP_pool_generic::set_stall_limit(uint limit)
{ {
if (!threadpool_started)
return;
mysql_mutex_lock(&(pool_timer.mutex)); mysql_mutex_lock(&(pool_timer.mutex));
pool_timer.tick_interval= limit; pool_timer.tick_interval= limit;
mysql_mutex_unlock(&(pool_timer.mutex)); mysql_mutex_unlock(&(pool_timer.mutex));
mysql_cond_signal(&(pool_timer.cond)); mysql_cond_signal(&(pool_timer.cond));
return 0;
} }
@ -1620,7 +1701,7 @@ void tp_set_threadpool_stall_limit(uint limit)
Don't do any locking, it is not required for stats. Don't do any locking, it is not required for stats.
*/ */
int tp_get_idle_thread_count() int TP_pool_generic::get_idle_thread_count()
{ {
int sum=0; int sum=0;
for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++) for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)

View File

@ -64,8 +64,9 @@ static void tp_log_warning(const char *msg, const char *fct)
} }
PTP_POOL pool; static PTP_POOL pool;
DWORD fls; static TP_CALLBACK_ENVIRON callback_environ;
static DWORD fls;
static bool skip_completion_port_on_success = false; static bool skip_completion_port_on_success = false;
@ -85,13 +86,16 @@ static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io);
static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work);
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance,
PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
static void check_thread_init(); static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance);
/* Get current time as Windows time */ /* Get current time as Windows time */
static ulonglong now() static ulonglong now()
@ -101,74 +105,86 @@ static ulonglong now()
return current_time; return current_time;
} }
/* struct TP_connection_win:public TP_connection
Connection structure, encapsulates THD + structures for asynchronous
IO and pool.
*/
struct connection_t
{ {
THD *thd; public:
TP_connection_win(CONNECT*);
~TP_connection_win();
virtual int init();
virtual int start_io();
virtual void set_io_timeout(int sec);
virtual void wait_begin(int type);
virtual void wait_end();
ulonglong timeout;
enum_vio_type vio_type;
HANDLE handle; HANDLE handle;
OVERLAPPED overlapped; OVERLAPPED overlapped;
/* absolute time for wait timeout (as Windows time) */ PTP_CALLBACK_INSTANCE callback_instance;
volatile ulonglong timeout;
TP_CALLBACK_ENVIRON callback_environ;
PTP_IO io; PTP_IO io;
PTP_TIMER timer; PTP_TIMER timer;
PTP_WAIT shm_read; PTP_WAIT shm_read;
/* Callback instance, used to inform treadpool about long callbacks */ PTP_WORK work;
PTP_CALLBACK_INSTANCE callback_instance; bool long_callback;
CONNECT* connect;
bool logged_in;
}; };
struct TP_connection *new_TP_connection(CONNECT *connect)
void init_connection(connection_t *connection, CONNECT *connect)
{ {
connection->logged_in = false; TP_connection *c = new (std::nothrow) TP_connection_win(connect);
connection->handle= 0; if (!c || c->init())
connection->io= 0; {
connection->shm_read= 0; delete c;
connection->timer= 0; return 0;
connection->logged_in = false; }
connection->timeout= ULONGLONG_MAX; return c;
connection->callback_instance= 0; }
connection->thd= 0;
memset(&connection->overlapped, 0, sizeof(OVERLAPPED)); void TP_pool_win::add(TP_connection *c)
InitializeThreadpoolEnvironment(&connection->callback_environ); {
SetThreadpoolCallbackPool(&connection->callback_environ, pool); SubmitThreadpoolWork(((TP_connection_win *)c)->work);
connection->connect= connect;
} }
int init_io(connection_t *connection, THD *thd) TP_connection_win::TP_connection_win(CONNECT *c) :
TP_connection(c),
timeout(ULONGLONG_MAX),
callback_instance(0),
io(0),
shm_read(0),
timer(0),
work(0)
{ {
connection->thd= thd; }
Vio *vio = thd->net.vio;
switch(vio->type) #define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
int TP_connection_win::init()
{
memset(&overlapped, 0, sizeof(OVERLAPPED));
Vio *vio = connect->vio;
switch ((vio_type = vio->type))
{ {
case VIO_TYPE_SSL: case VIO_TYPE_SSL:
case VIO_TYPE_TCPIP: case VIO_TYPE_TCPIP:
connection->handle= (HANDLE)mysql_socket_getfd(connection->thd->net.vio->mysql_socket); handle= (HANDLE)mysql_socket_getfd(vio->mysql_socket);
break; break;
case VIO_TYPE_NAMEDPIPE: case VIO_TYPE_NAMEDPIPE:
connection->handle= (HANDLE)vio->hPipe; handle= (HANDLE)vio->hPipe;
break; break;
case VIO_TYPE_SHARED_MEMORY: case VIO_TYPE_SHARED_MEMORY:
connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection, handle= vio->event_server_wrote;
&connection->callback_environ);
if (!connection->shm_read)
{
tp_log_warning("Allocation failed", "CreateThreadpoolWait");
return -1;
}
break; break;
default: default:
abort(); abort();
} }
if (connection->handle) if (vio_type == VIO_TYPE_SHARED_MEMORY)
{
CHECK_ALLOC_ERROR(shm_read= CreateThreadpoolWait(shm_read_callback, this, &callback_environ));
}
else
{ {
/* Performance tweaks (s. MSDN documentation)*/ /* Performance tweaks (s. MSDN documentation)*/
UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
@ -176,25 +192,13 @@ int init_io(connection_t *connection, THD *thd)
{ {
flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
} }
(void)SetFileCompletionNotificationModes(connection->handle, flags); (void)SetFileCompletionNotificationModes(handle, flags);
/* Assign io completion callback */ /* Assign io completion callback */
connection->io= CreateThreadpoolIo(connection->handle, CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
io_completion_callback, connection, &connection->callback_environ);
if(!connection->io)
{
tp_log_warning("Allocation failed", "CreateThreadpoolWait");
return -1;
}
}
connection->timer= CreateThreadpoolTimer(timer_callback, connection,
&connection->callback_environ);
if (!connection->timer)
{
tp_log_warning("Allocation failed", "CreateThreadpoolWait");
return -1;
} }
CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
return 0; return 0;
} }
@ -202,9 +206,8 @@ int init_io(connection_t *connection, THD *thd)
/* /*
Start asynchronous read Start asynchronous read
*/ */
int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) int TP_connection_win::start_io()
{ {
/* Start async read */
DWORD num_bytes = 0; DWORD num_bytes = 0;
static char c; static char c;
WSABUF buf; WSABUF buf;
@ -214,31 +217,18 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
DWORD last_error= 0; DWORD last_error= 0;
int retval; int retval;
Vio *vio= connection->thd->net.vio; if (shm_read)
if (vio->type == VIO_TYPE_SHARED_MEMORY)
{ {
SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL); SetThreadpoolWait(shm_read, handle, NULL);
return 0; return 0;
} }
if (vio->type == VIO_CLOSED)
{
return -1;
}
DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP ||
vio->type == VIO_TYPE_SSL ||
vio->type == VIO_TYPE_NAMEDPIPE);
OVERLAPPED *overlapped= &connection->overlapped;
PTP_IO io= connection->io;
StartThreadpoolIo(io); StartThreadpoolIo(io);
if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL) if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
{ {
/* Start async io (sockets). */ /* Start async io (sockets). */
if (WSARecv(mysql_socket_getfd(vio->mysql_socket) , &buf, 1, &num_bytes, &flags, if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
overlapped, NULL) == 0) &overlapped, NULL) == 0)
{ {
retval= last_error= 0; retval= last_error= 0;
} }
@ -251,7 +241,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
else else
{ {
/* Start async io (named pipe) */ /* Start async io (named pipe) */
if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped)) if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
{ {
retval= last_error= 0; retval= last_error= 0;
} }
@ -272,7 +262,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
if(skip_completion_port_on_success) if(skip_completion_port_on_success)
{ {
CancelThreadpoolIo(io); CancelThreadpoolIo(io);
io_completion_callback(instance, connection, overlapped, last_error, io_completion_callback(callback_instance, this, &overlapped, last_error,
num_bytes, io); num_bytes, io);
} }
return 0; return 0;
@ -288,81 +278,81 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
return -1; return -1;
} }
int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
{
if ((connection->thd= threadpool_add_connection(connection->connect, connection))
&& init_io(connection, connection->thd) == 0
&& start_io(connection, instance) == 0)
{
return 0;
}
return -1;
}
/* /*
Recalculate wait timeout, maybe reset timer. Recalculate wait timeout, maybe reset timer.
*/ */
void set_wait_timeout(connection_t *connection, ulonglong old_timeout) void TP_connection_win::set_io_timeout(int timeout_sec)
{ {
ulonglong new_timeout = now() + ulonglong old_timeout= timeout;
10000000LL*connection->thd->variables.net_wait_timeout; ulonglong new_timeout = now() + 10000000LL * timeout_sec;
if (new_timeout < old_timeout) if (new_timeout < old_timeout)
{ {
SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000); SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000);
} }
connection->timeout = new_timeout; /* new_timeout > old_timeout case is handled by expiring timer. */
timeout = new_timeout;
} }
/* Connection destructor */ TP_connection_win::~TP_connection_win()
void destroy_connection(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
{ {
if (instance) if (io)
DisassociateCurrentThreadFromCallback(instance); CloseThreadpoolIo(io);
if (connection->io)
if (shm_read)
CloseThreadpoolWait(shm_read);
if (work)
CloseThreadpoolWork(work);
if (timer)
{ {
WaitForThreadpoolIoCallbacks(connection->io, TRUE); WaitForThreadpoolTimerCallbacks(timer, TRUE);
CloseThreadpoolIo(connection->io); CloseThreadpoolTimer(timer);
}
} }
if(connection->shm_read) void TP_connection_win::wait_begin(int type)
{ {
WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE);
CloseThreadpoolWait(connection->shm_read);
}
if(connection->timer) /*
Signal to the threadpool whenever callback can run long. Currently, binlog
waits are a good candidate, its waits are really long
*/
if (type == THD_WAIT_BINLOG)
{ {
SetThreadpoolTimer(connection->timer, 0, 0, 0); if (!long_callback)
WaitForThreadpoolTimerCallbacks(connection->timer, TRUE);
CloseThreadpoolTimer(connection->timer);
}
if (connection->thd)
{ {
threadpool_remove_connection(connection->thd); CallbackMayRunLong(callback_instance);
long_callback= true;
}
}
} }
DestroyThreadpoolEnvironment(&connection->callback_environ); void TP_connection_win::wait_end()
{
/* Do we need to do anything ? */
} }
/* /*
This function should be called first whenever a callback is invoked in the This function should be called first whenever a callback is invoked in the
threadpool, does my_thread_init() if not yet done threadpool, does my_thread_init() if not yet done
*/ */
extern ulong thread_created; extern ulong thread_created;
static void check_thread_init() static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
{ {
if (FlsGetValue(fls) == NULL) if (FlsGetValue(fls) == NULL)
{ {
/* Running in new worker thread*/
FlsSetValue(fls, (void *)1); FlsSetValue(fls, (void *)1);
statistic_increment(thread_created, &LOCK_status); statistic_increment(thread_created, &LOCK_status);
InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
my_thread_init();
} }
TP_connection_win *c = (TP_connection_win *)context;
c->callback_instance = instance;
c->long_callback = false;
} }
@ -375,23 +365,111 @@ static VOID WINAPI thread_destructor(void *data)
if(data) if(data)
{ {
InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads); InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads);
my_thread_end();
} }
} }
/* Scheduler callback : init */
bool tp_init(void) static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context)
{
pre_callback(context, instance);
tp_callback((TP_connection *)context);
}
/*
Handle read completion/notification.
*/
static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
TP_connection_win *c= (TP_connection_win *)context;
/*
Execute high priority connections immediately.
'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)
which makes Windows threadpool place the items at the end of its internal work queue.
*/
if (c->priority == TP_PRIORITY_HIGH)
tp_callback(instance, context);
else
SubmitThreadpoolWork(c->work);
}
/*
Timer callback.
Invoked when connection times out (wait_timeout)
*/
static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
PVOID parameter, PTP_TIMER timer)
{
TP_connection_win *c = (TP_connection_win *)parameter;
if (c->timeout <= now())
{
tp_timeout_handler(c);
}
else
{
/*
Reset timer.
There is a tiny possibility of a race condition, since the value of timeout
could have changed to smaller value in the thread doing io callback.
Given the relative unimportance of the wait timeout, we accept race
condition.
*/
SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000);
}
}
/*
Shared memory read callback.
Invoked when read event is set on connection.
*/
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result)
{
TP_connection_win *c= (TP_connection_win *)context;
/* Disarm wait. */
SetThreadpoolWait(wait, NULL, NULL);
/*
This is an autoreset event, and one wakeup is eaten already by threadpool,
and the current state is "not set". Thus we need to reset the event again,
or vio_read will hang.
*/
SetEvent(c->handle);
tp_callback(instance, context);
}
static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work)
{
tp_callback(instance, context);
}
TP_pool_win::TP_pool_win()
{}
int TP_pool_win::init()
{ {
fls= FlsAlloc(thread_destructor); fls= FlsAlloc(thread_destructor);
pool= CreateThreadpool(NULL); pool= CreateThreadpool(NULL);
if (!pool) if (!pool)
{ {
sql_print_error("Can't create threadpool. " sql_print_error("Can't create threadpool. "
"CreateThreadpool() failed with %d. Likely cause is memory pressure", "CreateThreadpool() failed with %d. Likely cause is memory pressure",
GetLastError()); GetLastError());
exit(1); return -1;
} }
InitializeThreadpoolEnvironment(&callback_environ);
SetThreadpoolCallbackPool(&callback_environ, pool);
if (threadpool_max_threads) if (threadpool_max_threads)
{ {
SetThreadpoolThreadMaximum(pool, threadpool_max_threads); SetThreadpoolThreadMaximum(pool, threadpool_max_threads);
@ -407,9 +485,8 @@ bool tp_init(void)
} }
/* /*
Control stack size (OS must be Win7 or later, plus corresponding SDK) Control stack size (OS must be Win7 or later)
*/ */
#if _MSC_VER >=1600
if (SetThreadpoolStackInformation) if (SetThreadpoolStackInformation)
{ {
TP_POOL_STACK_INFORMATION stackinfo; TP_POOL_STACK_INFORMATION stackinfo;
@ -421,8 +498,6 @@ bool tp_init(void)
"SetThreadpoolStackInformation"); "SetThreadpoolStackInformation");
} }
} }
#endif
return 0; return 0;
} }
@ -430,209 +505,42 @@ bool tp_init(void)
/** /**
Scheduler callback : Destroy the scheduler. Scheduler callback : Destroy the scheduler.
*/ */
void tp_end(void) TP_pool_win::~TP_pool_win()
{
if(pool)
{ {
if (!pool)
return;
DestroyThreadpoolEnvironment(&callback_environ);
SetThreadpoolThreadMaximum(pool, 0); SetThreadpoolThreadMaximum(pool, 0);
CloseThreadpool(pool); CloseThreadpool(pool);
if (!tp_stats.num_worker_threads)
FlsFree(fls);
} }
}
/*
Handle read completion/notification.
*/
static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
if(instance)
{
check_thread_init();
}
connection_t *connection = (connection_t*)context;
if (io_result != ERROR_SUCCESS)
goto error;
THD *thd= connection->thd;
ulonglong old_timeout = connection->timeout;
connection->timeout = ULONGLONG_MAX;
connection->callback_instance= instance;
if (threadpool_process_request(connection->thd))
goto error;
set_wait_timeout(connection, old_timeout);
if(start_io(connection, instance))
goto error;
return;
error:
/* Some error has occurred. */
destroy_connection(connection, instance);
free(connection);
}
/* Simple callback for login */
static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_WORK work)
{
if(instance)
{
check_thread_init();
}
connection_t *connection =(connection_t *)context;
if (login(connection, instance) != 0)
{
destroy_connection(connection, instance);
free(connection);
}
}
/*
Timer callback.
Invoked when connection times out (wait_timeout)
*/
static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
PVOID parameter, PTP_TIMER timer)
{
check_thread_init();
connection_t *con= (connection_t*)parameter;
ulonglong timeout= con->timeout;
if (timeout <= now())
{
con->thd->killed = KILL_CONNECTION;
if(con->thd->net.vio)
vio_shutdown(con->thd->net.vio, SD_BOTH);
}
else if(timeout != ULONGLONG_MAX)
{
/*
Reset timer.
There is a tiny possibility of a race condition, since the value of timeout
could have changed to smaller value in the thread doing io callback.
Given the relative unimportance of the wait timeout, we accept race
condition.
*/
SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000);
}
}
/*
Shared memory read callback.
Invoked when read event is set on connection.
*/
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result)
{
connection_t *con= (connection_t *)context;
/* Disarm wait. */
SetThreadpoolWait(wait, NULL, NULL);
/*
This is an autoreset event, and one wakeup is eaten already by threadpool,
and the current state is "not set". Thus we need to reset the event again,
or vio_read will hang.
*/
HANDLE h = con->thd->net.vio->event_server_wrote;
SetEvent(h);
io_completion_callback(instance, context, NULL, 0, 0 , 0);
}
/*
Notify the thread pool about a new connection.
*/
void tp_add_connection(CONNECT *connect)
{
connection_t *con;
con= (connection_t *)malloc(sizeof(connection_t));
DBUG_EXECUTE_IF("simulate_failed_connection_1", free(con);con= 0; );
if (!con)
{
tp_log_warning("Allocation failed", "tp_add_connection");
connect->close_and_delete();
return;
}
init_connection(con, connect);
/* Try to login asynchronously, using threads in the pool */
PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
if (wrk)
{
SubmitThreadpoolWork(wrk);
CloseThreadpoolWork(wrk);
}
else
{
/* Likely memory pressure */
connect->close_and_delete();
}
}
/** /**
Sets the number of idle threads the thread pool maintains in anticipation of new Sets the number of idle threads the thread pool maintains in anticipation of new
requests. requests.
*/ */
void tp_set_min_threads(uint val) int TP_pool_win::set_min_threads(uint val)
{ {
if (pool)
SetThreadpoolThreadMinimum(pool, val); SetThreadpoolThreadMinimum(pool, val);
}
void tp_set_max_threads(uint val)
{
if (pool)
SetThreadpoolThreadMaximum(pool, val);
}
void tp_wait_begin(THD *thd, int type)
{
DBUG_ASSERT(thd);
/*
Signal to the threadpool whenever callback can run long. Currently, binlog
waits are a good candidate, its waits are really long
*/
if (type == THD_WAIT_BINLOG)
{
connection_t *connection= (connection_t *)thd->event_scheduler.data;
if(connection && connection->callback_instance)
{
CallbackMayRunLong(connection->callback_instance);
/*
Reset instance, to avoid calling CallbackMayRunLong twice within
the same callback (it is an error according to docs).
*/
connection->callback_instance= 0;
}
}
}
void tp_wait_end(THD *thd)
{
/* Do we need to do anything ? */
}
/**
Number of idle threads in pool.
This info is not available in Windows implementation,
thus function always returns 0.
*/
int tp_get_idle_thread_count()
{
return 0; return 0;
} }
int TP_pool_win::set_max_threads(uint val)
{
SetThreadpoolThreadMaximum(pool, val);
return 0;
}
TP_connection *TP_pool_win::new_connection(CONNECT *connect)
{
TP_connection *c= new (std::nothrow) TP_connection_win(connect);
if (!c )
return 0;
if (c->init())
{
delete c;
return 0;
}
return c;
}