1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MWL#192: Non-blocking client API for libmysqlclient.

All client functions that can block on I/O have alternate _start() and
_cont() versions that do not block but return control back to the
application, which can then issue I/O wait in its own fashion and later
call back into the library to continue the operation.

Works behind the scenes by spawning a co-routine/fiber to run the
blocking operation and suspend it while waiting for I/O. This
co-routine/fiber use is invisible to applications.

For i368/x86_64 on GCC, uses very fast assembler co-routine support. On
Windows uses native Win32 Fibers. Falls back to POSIX ucontext on other
platforms. Assembler routines for more platforms are relatively easy to
add by extending mysys/my_context.c, eg. similar to the Lua lcoco
library.

For testing, mysqltest and mysql_client_test are extended with the
option --non-blocking-api. This causes the programs to use the
non-blocking API for database access. mysql-test-run.pl has a similar
option --non-blocking-api that uses this, as well as additional
testcases.

An example program tests/async_queries.c is included that uses the new
non-blocking API with libevent to show how, in a single-threaded
program, to issue many queries in parallel against a database.


client/async_example.c:
  Fix const warning
  ******
  Fix bug with wrong timeout value for poll().
include/Makefile.am:
  Fix missing include for `make dist`
include/mysql.h:
  Add prototypes for all non-blocking API calls.
include/mysql.h.pp:
  Add prototypes for all non-blocking API calls.
mysys/my_context.c:
  Fix type warning for makecontext() function pointer argument.
sql-common/mysql_async.c:
  Fix crashes in the non-blocking API for functions that can take MYSQL argument
  that is NULL.
tests/Makefile.am:
  Add header file to `make dist`
tests/mysql_client_test.c:
  Replace blocking calls with wrappers around the non-blocking calls, used in
  mysql_client_test to test the new non-blocking API.
tests/nonblock-wrappers.h:
  Replace blocking calls with wrappers around the non-blocking calls, used in
  mysql_client_test to test the new non-blocking API.
This commit is contained in:
unknown
2011-09-20 12:49:25 +02:00
parent 1a344b87e4
commit a5b881594d
41 changed files with 4389 additions and 38 deletions

View File

@@ -108,6 +108,7 @@ my_bool net_flush(NET *net);
#include "client_settings.h"
#include <sql_common.h>
#include <mysql/client_plugin.h>
#include "my_context.h"
#define native_password_plugin_name "mysql_native_password"
#define old_password_plugin_name "mysql_old_password"
@@ -1050,6 +1051,15 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd)
return 0;
}
#define mysql_extension_get(MYSQL, X) \
((MYSQL)->extension ? (MYSQL)->extension->X : NULL)
#define mysql_extension_set(MYSQL, X, VAL) \
if (!(MYSQL)->extension) \
(MYSQL)->extension= (struct st_mysql_extension *) \
my_malloc(sizeof(struct st_mysql_extension), \
MYF(MY_WME | MY_ZEROFILL)); \
(MYSQL)->extension->X= VAL;
#define extension_set_string(OPTS, X, STR) \
if ((OPTS)->extension) \
my_free((OPTS)->extension->X, MYF(MY_ALLOW_ZERO_PTR)); \
@@ -1266,6 +1276,36 @@ void mysql_read_default_options(struct st_mysql_options *options,
DBUG_VOID_RETURN;
}
/*
Fetch the context for asynchronous API calls, allocating a new one if
necessary.
*/
#define STACK_SIZE (4096*15)
struct mysql_async_context *
mysql_get_async_context(MYSQL *mysql)
{
struct mysql_async_context *b;
if ((b= mysql_extension_get(mysql, async_context)))
return b;
if (!(b= (struct mysql_async_context *)
my_malloc(sizeof(*b), MYF(MY_ZEROFILL))))
{
set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate);
return NULL;
}
if (my_context_init(&b->async_context, STACK_SIZE))
{
my_free(b, MYF(0));
return NULL;
}
mysql_extension_set(mysql, async_context, b)
if (mysql->net.vio)
mysql->net.vio->async_context= b;
return b;
}
/**************************************************************************
Get column lengths of the current row
@@ -2537,6 +2577,26 @@ int run_plugin_auth(MYSQL *mysql, char *data, uint data_len,
}
static int
connect_sync_or_async(MYSQL *mysql, NET *net, my_socket fd,
const struct sockaddr *name, uint namelen)
{
extern int my_connect_async(struct mysql_async_context *b, my_socket fd,
const struct sockaddr *name, uint namelen,
uint timeout);
struct mysql_async_context *actxt= mysql_extension_get(mysql, async_context);
if (actxt && actxt->active)
{
my_bool old_mode;
vio_blocking(net->vio, FALSE, &old_mode);
return my_connect_async(actxt, fd, name, namelen,
mysql->options.connect_timeout);
}
else
return my_connect(fd, name, namelen, mysql->options.connect_timeout);
}
MYSQL * STDCALL
CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
const char *passwd, const char *db,
@@ -2552,6 +2612,7 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
struct sockaddr_in sock_addr;
ulong pkt_length;
NET *net= &mysql->net;
struct mysql_async_context *actxt;
#ifdef MYSQL_SERVER
thr_alarm_t alarmed;
ALARM alarm_buff;
@@ -2681,8 +2742,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
bzero((char*) &UNIXaddr,sizeof(UNIXaddr));
UNIXaddr.sun_family = AF_UNIX;
strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1);
if (my_connect(sock,(struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr),
mysql->options.connect_timeout))
if (connect_sync_or_async(mysql, net, sock,
(struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr)))
{
DBUG_PRINT("error",("Got error %d on connect to local server",
socket_errno));
@@ -2763,8 +2824,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
if ((int) (ip_addr = inet_addr(host)) != (int) INADDR_NONE)
{
memcpy_fixed(&sock_addr.sin_addr,&ip_addr,sizeof(ip_addr));
status= my_connect(sock, (struct sockaddr *) &sock_addr,
sizeof(sock_addr), mysql->options.connect_timeout);
status= connect_sync_or_async(mysql, net, sock,
(struct sockaddr *) &sock_addr,
sizeof(sock_addr));
}
else
{
@@ -2795,8 +2857,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
min(sizeof(sock_addr.sin_addr), (size_t) hp->h_length));
DBUG_PRINT("info",("Trying %s...",
(my_inet_ntoa(sock_addr.sin_addr, ipaddr), ipaddr)));
status= my_connect(sock, (struct sockaddr *) &sock_addr,
sizeof(sock_addr), mysql->options.connect_timeout);
status= connect_sync_or_async(mysql, net, sock,
(struct sockaddr *) &sock_addr,
sizeof(sock_addr));
}
my_gethostbyname_r_free();
@@ -2818,6 +2881,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
goto error;
}
if ((actxt= mysql_extension_get(mysql, async_context)) && actxt->active)
net->vio->async_context= actxt;
if (my_net_init(net, net->vio))
{
vio_delete(net->vio);
@@ -3077,9 +3143,34 @@ static void mysql_fix_pointers(MYSQL* mysql, MYSQL* old_mysql)
#endif
struct my_hook_data {
MYSQL *orig_mysql;
MYSQL *new_mysql;
/* This is always NULL currently, but restoring does not hurt just in case. */
Vio *orig_vio;
};
/*
Callback hook to make the new VIO accessible via the old MYSQL to calling
application when suspending a non-blocking call during automatic reconnect.
*/
static void
my_suspend_hook(my_bool suspend, void *data)
{
struct my_hook_data *hook_data= (struct my_hook_data *)data;
if (suspend)
{
hook_data->orig_vio= hook_data->orig_mysql->net.vio;
hook_data->orig_mysql->net.vio= hook_data->new_mysql->net.vio;
}
else
hook_data->orig_mysql->net.vio= hook_data->orig_vio;
}
my_bool mysql_reconnect(MYSQL *mysql)
{
MYSQL tmp_mysql;
struct my_hook_data hook_data;
struct mysql_async_context *ctxt= NULL;
DBUG_ENTER("mysql_reconnect");
DBUG_ASSERT(mysql);
DBUG_PRINT("enter", ("mysql->reconnect: %d", mysql->reconnect));
@@ -3093,14 +3184,34 @@ my_bool mysql_reconnect(MYSQL *mysql)
DBUG_RETURN(1);
}
mysql_init(&tmp_mysql);
tmp_mysql.extension= mysql->extension;
tmp_mysql.options= mysql->options;
tmp_mysql.options.my_cnf_file= tmp_mysql.options.my_cnf_group= 0;
tmp_mysql.rpl_pivot= mysql->rpl_pivot;
/*
If we are automatically re-connecting inside a non-blocking API call, we
may need to suspend and yield to the user application during the reconnect.
If so, the user application will need access to the new VIO already then
so that it can correctly wait for I/O to become ready.
To achieve this, we temporarily install a hook that will temporarily put in
the VIO while we are suspended.
(The vio will be put in the original MYSQL permanently once we successfully
reconnect, or be discarded if we fail to reconnect.)
*/
if ((ctxt= mysql_extension_get(mysql, async_context)) && ctxt->active)
{
hook_data.orig_mysql= mysql;
hook_data.new_mysql= &tmp_mysql;
hook_data.orig_vio= mysql->net.vio;
my_context_install_suspend_resume_hook(ctxt, my_suspend_hook, &hook_data);
}
if (!mysql_real_connect(&tmp_mysql,mysql->host,mysql->user,mysql->passwd,
mysql->db, mysql->port, mysql->unix_socket,
mysql->client_flag | CLIENT_REMEMBER_OPTIONS))
{
if (ctxt)
my_context_install_suspend_resume_hook(ctxt, NULL, NULL);
mysql->net.last_errno= tmp_mysql.net.last_errno;
strmov(mysql->net.last_error, tmp_mysql.net.last_error);
strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate);
@@ -3109,13 +3220,18 @@ my_bool mysql_reconnect(MYSQL *mysql)
if (mysql_set_character_set(&tmp_mysql, mysql->charset->csname))
{
DBUG_PRINT("error", ("mysql_set_character_set() failed"));
tmp_mysql.extension= NULL;
bzero((char*) &tmp_mysql.options,sizeof(tmp_mysql.options));
mysql_close(&tmp_mysql);
if (ctxt)
my_context_install_suspend_resume_hook(ctxt, NULL, NULL);
mysql->net.last_errno= tmp_mysql.net.last_errno;
strmov(mysql->net.last_error, tmp_mysql.net.last_error);
strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate);
DBUG_RETURN(1);
}
if (ctxt)
my_context_install_suspend_resume_hook(ctxt, NULL, NULL);
DBUG_PRINT("info", ("reconnect succeded"));
tmp_mysql.reconnect= 1;
@@ -3125,7 +3241,11 @@ my_bool mysql_reconnect(MYSQL *mysql)
tmp_mysql.stmts= mysql->stmts;
mysql->stmts= 0;
/* Don't free options as these are now used in tmp_mysql */
/*
Don't free options as these are now used in tmp_mysql.
Same with extension.
*/
mysql->extension= NULL;
bzero((char*) &mysql->options,sizeof(mysql->options));
mysql->free_me=0;
mysql_close(mysql);
@@ -3204,6 +3324,21 @@ static void mysql_close_free_options(MYSQL *mysql)
}
static void
mysql_close_free_extension(MYSQL *mysql)
{
if (mysql->extension)
{
if (mysql->extension->async_context)
{
my_context_destroy(&mysql->extension->async_context->async_context);
my_free(mysql->extension->async_context, MYF(0));
}
my_free(mysql->extension, MYF(0));
mysql->extension= NULL;
}
}
static void mysql_close_free(MYSQL *mysql)
{
my_free((uchar*) mysql->host_info,MYF(MY_ALLOW_ZERO_PTR));
@@ -3304,6 +3439,33 @@ void mysql_detach_stmt_list(LIST **stmt_list __attribute__((unused)),
(As some clients call this after mysql_real_connect() fails)
*/
/*
mysql_close() can actually block, at least in theory, if the socket buffer
is full when sending the COM_QUIT command.
On the other hand, the latter part of mysql_close() needs to free the stack
used for non-blocking operation of blocking stuff, so that later part can
_not_ be done non-blocking.
Therefore, mysql_pre_close() is used to run the parts of mysql_close() that
may block. It can be called before mysql_close(), and in that case
mysql_close() is guaranteed not to need to block.
*/
void mysql_pre_close(MYSQL *mysql)
{
if (!mysql)
return;
/* If connection is still up, send a QUIT message */
if (mysql->net.vio != 0)
{
free_old_query(mysql);
mysql->status=MYSQL_STATUS_READY; /* Force command */
mysql->reconnect=0;
simple_command(mysql,COM_QUIT,(uchar*) 0,0,1);
end_server(mysql); /* Sets mysql->net.vio= 0 */
}
}
void STDCALL mysql_close(MYSQL *mysql)
{
DBUG_ENTER("mysql_close");
@@ -3311,16 +3473,9 @@ void STDCALL mysql_close(MYSQL *mysql)
if (mysql) /* Some simple safety */
{
/* If connection is still up, send a QUIT message */
if (mysql->net.vio != 0)
{
free_old_query(mysql);
mysql->status=MYSQL_STATUS_READY; /* Force command */
mysql->reconnect=0;
simple_command(mysql,COM_QUIT,(uchar*) 0,0,1);
end_server(mysql); /* Sets mysql->net.vio= 0 */
}
mysql_pre_close(mysql);
mysql_close_free_options(mysql);
mysql_close_free_extension(mysql);
mysql_close_free(mysql);
mysql_detach_stmt_list(&mysql->stmts, "mysql_close");
#ifndef TO_BE_DELETED
@@ -3941,3 +4096,12 @@ static int old_password_auth_client(MYSQL_PLUGIN_VIO *vio, MYSQL *mysql)
return CR_OK;
}
my_socket STDCALL
mysql_get_socket(const MYSQL *mysql)
{
if (mysql->net.vio)
return mysql->net.vio->sd;
else
return INVALID_SOCKET;
}