1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

Merge MWL#192: Non-blocking client library, into MariaDB 5.5.

This commit is contained in:
unknown
2012-02-21 22:15:44 +01:00
36 changed files with 5362 additions and 43 deletions

View File

@@ -112,6 +112,8 @@ my_bool net_flush(NET *net);
#include "client_settings.h"
#include <sql_common.h>
#include <mysql/client_plugin.h>
#include <my_context.h>
#include <mysql_async.h>
#define native_password_plugin_name "mysql_native_password"
#define old_password_plugin_name "mysql_old_password"
@@ -2939,6 +2941,22 @@ int run_plugin_auth(MYSQL *mysql, char *data, uint data_len,
}
static int
connect_sync_or_async(MYSQL *mysql, NET *net, my_socket fd,
const struct sockaddr *name, uint namelen)
{
if (mysql->options.extension && mysql->options.extension->async_context &&
mysql->options.extension->async_context->active)
{
my_bool old_mode;
vio_blocking(net->vio, FALSE, &old_mode);
return my_connect_async(mysql->options.extension->async_context, fd,
name, namelen, mysql->options.connect_timeout);
}
return my_connect(fd, name, namelen, mysql->options.connect_timeout);
}
MYSQL * STDCALL
CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
const char *passwd, const char *db,
@@ -3097,9 +3115,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
bzero((char*) &UNIXaddr, sizeof(UNIXaddr));
UNIXaddr.sun_family= AF_UNIX;
strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1);
if (my_connect(sock, (struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr),
mysql->options.connect_timeout))
if (connect_sync_or_async(mysql, net, sock,
(struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr)))
{
DBUG_PRINT("error",("Got error %d on connect to local server",
socket_errno));
@@ -3214,9 +3231,18 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
continue;
}
net->vio= vio_new(sock, VIO_TYPE_TCPIP, VIO_BUFFERED_READ);
if (!net->vio)
{
set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate);
closesocket(sock);
freeaddrinfo(res_lst);
goto error;
}
DBUG_PRINT("info", ("Connect socket"));
status= my_connect(sock, t_res->ai_addr, t_res->ai_addrlen,
mysql->options.connect_timeout);
status= connect_sync_or_async(mysql, net, sock,
t_res->ai_addr, t_res->ai_addrlen);
/*
Here we rely on my_connect() to return success only if the
connect attempt was really successful. Otherwise we would stop
@@ -3232,7 +3258,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
saved_error= socket_errno;
DBUG_PRINT("info", ("No success, close socket, try next address."));
closesocket(sock);
vio_delete(mysql->net.vio);
mysql->net.vio= 0;
}
DBUG_PRINT("info",
("End of connect attempts, sock: %d status: %d error: %d",
@@ -3254,15 +3281,6 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
ER(CR_CONN_HOST_ERROR), host, saved_error);
goto error;
}
net->vio= vio_new(sock, VIO_TYPE_TCPIP, VIO_BUFFERED_READ);
if (! net->vio )
{
DBUG_PRINT("error",("Unknow protocol %d ", mysql->options.protocol));
set_mysql_error(mysql, CR_CONN_UNKNOW_PROTOCOL, unknown_sqlstate);
closesocket(sock);
goto error;
}
}
DBUG_PRINT("info", ("net->vio: %p", net->vio));
@@ -3273,6 +3291,9 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
goto error;
}
if (mysql->options.extension && mysql->options.extension->async_context)
net->vio->async_context= mysql->options.extension->async_context;
if (my_net_init(net, net->vio))
{
vio_delete(net->vio);
@@ -3492,16 +3513,39 @@ error:
/* Free alloced memory */
end_server(mysql);
mysql_close_free(mysql);
if (!(client_flag & CLIENT_REMEMBER_OPTIONS))
mysql_close_free_options(mysql);
}
DBUG_RETURN(0);
}
struct my_hook_data {
MYSQL *orig_mysql;
MYSQL *new_mysql;
/* This is always NULL currently, but restoring does not hurt just in case. */
Vio *orig_vio;
};
/*
Callback hook to make the new VIO accessible via the old MYSQL to calling
application when suspending a non-blocking call during automatic reconnect.
*/
static void
my_suspend_hook(my_bool suspend, void *data)
{
struct my_hook_data *hook_data= (struct my_hook_data *)data;
if (suspend)
{
hook_data->orig_vio= hook_data->orig_mysql->net.vio;
hook_data->orig_mysql->net.vio= hook_data->new_mysql->net.vio;
}
else
hook_data->orig_mysql->net.vio= hook_data->orig_vio;
}
my_bool mysql_reconnect(MYSQL *mysql)
{
MYSQL tmp_mysql;
struct my_hook_data hook_data;
struct mysql_async_context *ctxt= NULL;
DBUG_ENTER("mysql_reconnect");
DBUG_ASSERT(mysql);
DBUG_PRINT("enter", ("mysql->reconnect: %d", mysql->reconnect));
@@ -3518,10 +3562,31 @@ my_bool mysql_reconnect(MYSQL *mysql)
tmp_mysql.options= mysql->options;
tmp_mysql.options.my_cnf_file= tmp_mysql.options.my_cnf_group= 0;
/*
If we are automatically re-connecting inside a non-blocking API call, we
may need to suspend and yield to the user application during the reconnect.
If so, the user application will need access to the new VIO already then
so that it can correctly wait for I/O to become ready.
To achieve this, we temporarily install a hook that will temporarily put in
the VIO while we are suspended.
(The vio will be put in the original MYSQL permanently once we successfully
reconnect, or be discarded if we fail to reconnect.)
*/
if (mysql->options.extension &&
(ctxt= mysql->options.extension->async_context) &&
mysql->options.extension->async_context->active)
{
hook_data.orig_mysql= mysql;
hook_data.new_mysql= &tmp_mysql;
hook_data.orig_vio= mysql->net.vio;
my_context_install_suspend_resume_hook(ctxt, my_suspend_hook, &hook_data);
}
if (!mysql_real_connect(&tmp_mysql,mysql->host,mysql->user,mysql->passwd,
mysql->db, mysql->port, mysql->unix_socket,
mysql->client_flag | CLIENT_REMEMBER_OPTIONS))
mysql->client_flag))
{
if (ctxt)
my_context_install_suspend_resume_hook(ctxt, NULL, NULL);
mysql->net.last_errno= tmp_mysql.net.last_errno;
strmov(mysql->net.last_error, tmp_mysql.net.last_error);
strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate);
@@ -3532,11 +3597,15 @@ my_bool mysql_reconnect(MYSQL *mysql)
DBUG_PRINT("error", ("mysql_set_character_set() failed"));
bzero((char*) &tmp_mysql.options,sizeof(tmp_mysql.options));
mysql_close(&tmp_mysql);
if (ctxt)
my_context_install_suspend_resume_hook(ctxt, NULL, NULL);
mysql->net.last_errno= tmp_mysql.net.last_errno;
strmov(mysql->net.last_error, tmp_mysql.net.last_error);
strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate);
DBUG_RETURN(1);
}
if (ctxt)
my_context_install_suspend_resume_hook(ctxt, NULL, NULL);
DBUG_PRINT("info", ("reconnect succeded"));
tmp_mysql.reconnect= 1;
@@ -3615,8 +3684,14 @@ static void mysql_close_free_options(MYSQL *mysql)
#endif /* HAVE_SMEM */
if (mysql->options.extension)
{
struct mysql_async_context *ctxt= mysql->options.extension->async_context;
my_free(mysql->options.extension->plugin_dir);
my_free(mysql->options.extension->default_auth);
if (ctxt)
{
my_context_destroy(&ctxt->async_context);
my_free(ctxt);
}
my_free(mysql->options.extension);
}
bzero((char*) &mysql->options,sizeof(mysql->options));
@@ -3724,6 +3799,30 @@ void mysql_detach_stmt_list(LIST **stmt_list __attribute__((unused)),
(As some clients call this after mysql_real_connect() fails)
*/
/*
mysql_close() can actually block, at least in theory, if the socket buffer
is full when sending the COM_QUIT command.
On the other hand, the latter part of mysql_close() needs to free the stack
used for non-blocking operation of blocking stuff, so that later part can
_not_ be done non-blocking.
Therefore, mysql_close_slow_part() is used to run the parts of mysql_close()
that may block. It can be called before mysql_close(), and in that case
mysql_close() is guaranteed not to need to block. */
void STDCALL mysql_close_slow_part(MYSQL *mysql)
{
/* If connection is still up, send a QUIT message */
if (mysql->net.vio != 0)
{
free_old_query(mysql);
mysql->status=MYSQL_STATUS_READY; /* Force command */
mysql->reconnect=0;
simple_command(mysql,COM_QUIT,(uchar*) 0,0,1);
end_server(mysql); /* Sets mysql->net.vio= 0 */
}
}
void STDCALL mysql_close(MYSQL *mysql)
{
DBUG_ENTER("mysql_close");
@@ -3731,15 +3830,7 @@ void STDCALL mysql_close(MYSQL *mysql)
if (mysql) /* Some simple safety */
{
/* If connection is still up, send a QUIT message */
if (mysql->net.vio != 0)
{
free_old_query(mysql);
mysql->status=MYSQL_STATUS_READY; /* Force command */
mysql->reconnect=0;
simple_command(mysql,COM_QUIT,(uchar*) 0,0,1);
end_server(mysql); /* Sets mysql->net.vio= 0 */
}
mysql_close_slow_part(mysql);
mysql_close_free_options(mysql);
mysql_close_free(mysql);
mysql_detach_stmt_list(&mysql->stmts, "mysql_close");
@@ -4025,9 +4116,14 @@ mysql_fetch_lengths(MYSQL_RES *res)
}
#define ASYNC_CONTEXT_DEFAULT_STACK_SIZE (4096*15)
int STDCALL
mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
{
struct mysql_async_context *ctxt;
size_t stacksize;
DBUG_ENTER("mysql_options");
DBUG_PRINT("enter",("option: %d",(int) option));
switch (option) {
@@ -4120,6 +4216,39 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
mysql->options.extension->report_progress=
(void (*)(const MYSQL *, uint, uint, double, const char *, uint)) arg;
break;
case MYSQL_OPT_NONBLOCK:
if (mysql->options.extension &&
(ctxt = mysql->options.extension->async_context) != 0)
{
/*
We must not allow changing the stack size while a non-blocking call is
suspended (as the stack is then in use).
*/
if (ctxt->suspended)
DBUG_RETURN(1);
my_context_destroy(&ctxt->async_context);
my_free(ctxt);
}
if (!(ctxt= (struct mysql_async_context *)
my_malloc(sizeof(*ctxt), MYF(MY_ZEROFILL))))
{
set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate);
DBUG_RETURN(1);
}
stacksize= 0;
if (arg)
stacksize= *(const size_t *)arg;
if (!stacksize)
stacksize= ASYNC_CONTEXT_DEFAULT_STACK_SIZE;
if (my_context_init(&ctxt->async_context, stacksize))
{
my_free(ctxt);
DBUG_RETURN(1);
}
EXTENSION_SET(&(mysql->options), async_context, ctxt)
if (mysql->net.vio)
mysql->net.vio->async_context= ctxt;
break;
default:
DBUG_RETURN(1);
}
@@ -4329,3 +4458,11 @@ static int old_password_auth_client(MYSQL_PLUGIN_VIO *vio, MYSQL *mysql)
DBUG_RETURN(CR_OK);
}
my_socket STDCALL
mysql_get_socket(const MYSQL *mysql)
{
if (mysql->net.vio)
return mysql->net.vio->sd;
return INVALID_SOCKET;
}