1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-07 02:42:49 +03:00

Moved async read/write to pvio:

plugins now contain their own asynchronous read/write functions.
Todo:
  - asynchronous SSL
  - asynchronous read/write for non socket plugins
This commit is contained in:
Georg Richter
2015-11-16 07:37:31 +01:00
parent 9b1cbf15a5
commit 0af129fe80
5 changed files with 67 additions and 62 deletions

View File

@@ -16,6 +16,10 @@
struct st_ma_pvio_methods; struct st_ma_pvio_methods;
typedef struct st_ma_pvio_methods PVIO_METHODS; typedef struct st_ma_pvio_methods PVIO_METHODS;
#define IS_ASYNC_ACTIVE(a) \
((a)->mysql->options.extension && (a)->mysql->options.extension->async_context && \
(a)->mysql->options.extension->async_context->active)
#ifndef ssl_defined #ifndef ssl_defined
#define ssl_defined #define ssl_defined
struct st_ma_pvio_ssl; struct st_ma_pvio_ssl;

View File

@@ -59,6 +59,10 @@ extern pthread_mutex_t THR_LOCK_lock;
/* callback functions for read/write */ /* callback functions for read/write */
LIST *pvio_callback= NULL; LIST *pvio_callback= NULL;
#define IS_BLOCKING_ERROR() \
IF_WIN(WSAGetLastError() != WSAEWOULDBLOCK, \
(errno != EAGAIN && errno != EINTR))
/* {{{ MARIADB_PVIO *ma_pvio_init */ /* {{{ MARIADB_PVIO *ma_pvio_init */
MARIADB_PVIO *ma_pvio_init(MA_PVIO_CINFO *cinfo) MARIADB_PVIO *ma_pvio_init(MA_PVIO_CINFO *cinfo)
{ {
@@ -173,7 +177,7 @@ my_bool ma_pvio_set_timeout(MARIADB_PVIO *pvio,
static size_t ma_pvio_read_async(MARIADB_PVIO *pvio, uchar *buffer, size_t length) static size_t ma_pvio_read_async(MARIADB_PVIO *pvio, uchar *buffer, size_t length)
{ {
ssize_t res; ssize_t res;
struct mysql_async_context *b= pvio->async_context; struct mysql_async_context *b= pvio->mysql->options.extension->async_context;
int timeout= pvio->timeout[PVIO_READ_TIMEOUT]; int timeout= pvio->timeout[PVIO_READ_TIMEOUT];
for (;;) for (;;)
@@ -207,11 +211,10 @@ size_t ma_pvio_read(MARIADB_PVIO *pvio, uchar *buffer, size_t length)
if (!pvio) if (!pvio)
return -1; return -1;
if (pvio && pvio->async_context && pvio->async_context->active) if (pvio && pvio->async_context && pvio->async_context->active)
{ {
goto end;
r= ma_pvio_read_async(pvio, buffer, length); r= ma_pvio_read_async(pvio, buffer, length);
goto end;
} }
else else
{ {
@@ -289,6 +292,36 @@ size_t ma_pvio_cache_read(MARIADB_PVIO *pvio, uchar *buffer, size_t length)
} }
/* }}} */ /* }}} */
/* {{{ size_t ma_pvio_write_async */
static size_t ma_pvio_write_async(MARIADB_PVIO *pvio, const uchar *buffer, size_t length)
{
ssize_t res;
struct mysql_async_context *b= pvio->mysql->options.extension->async_context;
int timeout= pvio->timeout[PVIO_WRITE_TIMEOUT];
for (;;)
{
if (pvio->methods->async_write)
res= pvio->methods->async_write(pvio, buffer, length);
if (res >= 0 || IS_BLOCKING_ERROR())
return res;
b->events_to_wait_for= MYSQL_WAIT_WRITE;
if (timeout >= 0)
{
b->events_to_wait_for|= MYSQL_WAIT_TIMEOUT;
b->timeout_value= timeout;
}
if (b->suspend_resume_hook)
(*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data);
my_context_yield(&b->async_context);
if (b->suspend_resume_hook)
(*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data);
if (b->events_occured & MYSQL_WAIT_TIMEOUT)
return -1;
}
}
/* }}} */
/* {{{ size_t ma_pvio_write */ /* {{{ size_t ma_pvio_write */
size_t ma_pvio_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t length) size_t ma_pvio_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t length)
{ {
@@ -315,8 +348,27 @@ size_t ma_pvio_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t length)
r= ma_pvio_ssl_write(pvio->cssl, buffer, length); r= ma_pvio_ssl_write(pvio->cssl, buffer, length);
else else
#endif #endif
if (IS_ASYNC_ACTIVE(pvio))
{
r= ma_pvio_write_async(pvio, buffer, length);
goto end;
}
else
{
if (pvio->async_context)
{
/*
If switching from non-blocking to blocking API usage, set the socket
back to blocking mode.
*/
my_bool old_mode;
ma_pvio_blocking(pvio, TRUE, &old_mode);
}
}
if (pvio->methods->write) if (pvio->methods->write)
r= pvio->methods->write(pvio, buffer, length); r= pvio->methods->write(pvio, buffer, length);
end:
if (pvio->callback) if (pvio->callback)
pvio->callback(pvio, 0, buffer, r); pvio->callback(pvio, 0, buffer, r);
return r; return r;

View File

@@ -134,64 +134,6 @@ my_connect_async(MARIADB_PVIO *pvio,
#endif #endif
#endif #endif
ssize_t
my_recv_async(MARIADB_PVIO *pvio, unsigned char *buf, size_t size, int timeout)
{
ssize_t res;
struct mysql_async_context *b= pvio->async_context;
for (;;)
{
/* todo: async */
if (pvio->methods->async_read)
res= pvio->methods->async_read(pvio, buf, size);
if (res >= 0 || IS_BLOCKING_ERROR())
return res;
b->events_to_wait_for= MYSQL_WAIT_READ;
if (timeout >= 0)
{
b->events_to_wait_for|= MYSQL_WAIT_TIMEOUT;
b->timeout_value= timeout;
}
if (b->suspend_resume_hook)
(*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data);
my_context_yield(&b->async_context);
if (b->suspend_resume_hook)
(*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data);
if (b->events_occured & MYSQL_WAIT_TIMEOUT)
return -1;
}
}
ssize_t
my_send_async(MARIADB_PVIO *pvio, const unsigned char *buf, size_t size, int timeout)
{
ssize_t res;
struct mysql_async_context *b= pvio->async_context;
for (;;)
{
if (pvio->methods->async_write)
res= pvio->methods->async_write(pvio, buf, size);
if (res >= 0 || IS_BLOCKING_ERROR())
return res;
b->events_to_wait_for= MYSQL_WAIT_WRITE;
if (timeout >= 0)
{
b->events_to_wait_for|= MYSQL_WAIT_TIMEOUT;
b->timeout_value= timeout;
}
if (b->suspend_resume_hook)
(*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data);
my_context_yield(&b->async_context);
if (b->suspend_resume_hook)
(*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data);
if (b->events_occured & MYSQL_WAIT_TIMEOUT)
return -1;
}
}
my_bool my_bool
my_io_wait_async(struct mysql_async_context *b, enum enum_pvio_io_event event, my_io_wait_async(struct mysql_async_context *b, enum enum_pvio_io_event event,
int timeout) int timeout)

View File

@@ -313,6 +313,9 @@ size_t pvio_socket_async_read(MARIADB_PVIO *pvio, uchar *buffer, size_t length)
#ifndef _WIN32 #ifndef _WIN32
r= recv(csock->socket,(void *)buffer, length, read_flags); r= recv(csock->socket,(void *)buffer, length, read_flags);
#else #else
/* Windows doesn't support MSG_DONTWAIT, so we need to set
socket to non blocking */
pvio_socket_blocking(pvio, 0, 0);
r= recv(csock->socket, (char *)buffer, (int)length, 0); r= recv(csock->socket, (char *)buffer, (int)length, 0);
#endif #endif
return r; return r;
@@ -359,12 +362,16 @@ size_t pvio_socket_async_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t l
#ifndef WIN32 #ifndef WIN32
r= send(csock->socket, buffer, length, write_flags); r= send(csock->socket, buffer, length, write_flags);
#else #else
/* Windows doesn't support MSG_DONTWAIT, so we need to set
socket to non blocking */
pvio_socket_blocking(pvio, 0, 0);
r= send(csock->socket, buffer, (int)length, 0); r= send(csock->socket, buffer, (int)length, 0);
#endif #endif
return r; return r;
} }
/* }}} */ /* }}} */
/* {{{ pvio_socket_write */ /* {{{ pvio_socket_write */
/* /*
write to socket write to socket

View File

@@ -118,7 +118,7 @@ wait_for_mysql(MYSQL *mysql, int status)
static int async1(MYSQL *my) static int async1(MYSQL *my)
{ {
int err, rc; int err= 0, rc;
MYSQL mysql, *ret; MYSQL mysql, *ret;
MYSQL_RES *res; MYSQL_RES *res;
MYSQL_ROW row; MYSQL_ROW row;