diff --git a/include/ma_pvio.h b/include/ma_pvio.h index 756fb9a0..5e87ad42 100644 --- a/include/ma_pvio.h +++ b/include/ma_pvio.h @@ -16,6 +16,10 @@ struct st_ma_pvio_methods; typedef struct st_ma_pvio_methods PVIO_METHODS; +#define IS_ASYNC_ACTIVE(a) \ + ((a)->mysql->options.extension && (a)->mysql->options.extension->async_context && \ + (a)->mysql->options.extension->async_context->active) + #ifndef ssl_defined #define ssl_defined struct st_ma_pvio_ssl; diff --git a/libmariadb/ma_pvio.c b/libmariadb/ma_pvio.c index 089de263..420d83f2 100644 --- a/libmariadb/ma_pvio.c +++ b/libmariadb/ma_pvio.c @@ -59,6 +59,10 @@ extern pthread_mutex_t THR_LOCK_lock; /* callback functions for read/write */ LIST *pvio_callback= NULL; +#define IS_BLOCKING_ERROR() \ + IF_WIN(WSAGetLastError() != WSAEWOULDBLOCK, \ + (errno != EAGAIN && errno != EINTR)) + /* {{{ MARIADB_PVIO *ma_pvio_init */ MARIADB_PVIO *ma_pvio_init(MA_PVIO_CINFO *cinfo) { @@ -173,7 +177,7 @@ my_bool ma_pvio_set_timeout(MARIADB_PVIO *pvio, static size_t ma_pvio_read_async(MARIADB_PVIO *pvio, uchar *buffer, size_t length) { ssize_t res; - struct mysql_async_context *b= pvio->async_context; + struct mysql_async_context *b= pvio->mysql->options.extension->async_context; int timeout= pvio->timeout[PVIO_READ_TIMEOUT]; for (;;) @@ -207,11 +211,10 @@ size_t ma_pvio_read(MARIADB_PVIO *pvio, uchar *buffer, size_t length) if (!pvio) return -1; - if (pvio && pvio->async_context && pvio->async_context->active) { - goto end; r= ma_pvio_read_async(pvio, buffer, length); + goto end; } else { @@ -289,6 +292,36 @@ size_t ma_pvio_cache_read(MARIADB_PVIO *pvio, uchar *buffer, size_t length) } /* }}} */ +/* {{{ size_t ma_pvio_write_async */ +static size_t ma_pvio_write_async(MARIADB_PVIO *pvio, const uchar *buffer, size_t length) +{ + ssize_t res; + struct mysql_async_context *b= pvio->mysql->options.extension->async_context; + int timeout= pvio->timeout[PVIO_WRITE_TIMEOUT]; + + for (;;) + { + if (pvio->methods->async_write) + res= pvio->methods->async_write(pvio, buffer, length); + if (res >= 0 || IS_BLOCKING_ERROR()) + return res; + b->events_to_wait_for= MYSQL_WAIT_WRITE; + if (timeout >= 0) + { + b->events_to_wait_for|= MYSQL_WAIT_TIMEOUT; + b->timeout_value= timeout; + } + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + if (b->events_occured & MYSQL_WAIT_TIMEOUT) + return -1; + } +} +/* }}} */ + /* {{{ size_t ma_pvio_write */ size_t ma_pvio_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t length) { @@ -315,8 +348,27 @@ size_t ma_pvio_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t length) r= ma_pvio_ssl_write(pvio->cssl, buffer, length); else #endif + if (IS_ASYNC_ACTIVE(pvio)) + { + r= ma_pvio_write_async(pvio, buffer, length); + goto end; + } + else + { + if (pvio->async_context) + { + /* + If switching from non-blocking to blocking API usage, set the socket + back to blocking mode. + */ + my_bool old_mode; + ma_pvio_blocking(pvio, TRUE, &old_mode); + } + } + if (pvio->methods->write) r= pvio->methods->write(pvio, buffer, length); +end: if (pvio->callback) pvio->callback(pvio, 0, buffer, r); return r; diff --git a/libmariadb/mysql_async.c b/libmariadb/mysql_async.c index b1e50bcf..00581dbd 100644 --- a/libmariadb/mysql_async.c +++ b/libmariadb/mysql_async.c @@ -134,64 +134,6 @@ my_connect_async(MARIADB_PVIO *pvio, #endif #endif -ssize_t -my_recv_async(MARIADB_PVIO *pvio, unsigned char *buf, size_t size, int timeout) -{ - ssize_t res; - struct mysql_async_context *b= pvio->async_context; - for (;;) - { - /* todo: async */ - if (pvio->methods->async_read) - res= pvio->methods->async_read(pvio, buf, size); - if (res >= 0 || IS_BLOCKING_ERROR()) - return res; - b->events_to_wait_for= MYSQL_WAIT_READ; - if (timeout >= 0) - { - b->events_to_wait_for|= MYSQL_WAIT_TIMEOUT; - b->timeout_value= timeout; - } - if (b->suspend_resume_hook) - (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); - my_context_yield(&b->async_context); - if (b->suspend_resume_hook) - (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); - if (b->events_occured & MYSQL_WAIT_TIMEOUT) - return -1; - } -} - - -ssize_t -my_send_async(MARIADB_PVIO *pvio, const unsigned char *buf, size_t size, int timeout) -{ - ssize_t res; - struct mysql_async_context *b= pvio->async_context; - - for (;;) - { - if (pvio->methods->async_write) - res= pvio->methods->async_write(pvio, buf, size); - if (res >= 0 || IS_BLOCKING_ERROR()) - return res; - b->events_to_wait_for= MYSQL_WAIT_WRITE; - if (timeout >= 0) - { - b->events_to_wait_for|= MYSQL_WAIT_TIMEOUT; - b->timeout_value= timeout; - } - if (b->suspend_resume_hook) - (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); - my_context_yield(&b->async_context); - if (b->suspend_resume_hook) - (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); - if (b->events_occured & MYSQL_WAIT_TIMEOUT) - return -1; - } -} - - my_bool my_io_wait_async(struct mysql_async_context *b, enum enum_pvio_io_event event, int timeout) diff --git a/plugins/pvio/pvio_socket.c b/plugins/pvio/pvio_socket.c index aa5c407c..5a1c6eb8 100644 --- a/plugins/pvio/pvio_socket.c +++ b/plugins/pvio/pvio_socket.c @@ -313,6 +313,9 @@ size_t pvio_socket_async_read(MARIADB_PVIO *pvio, uchar *buffer, size_t length) #ifndef _WIN32 r= recv(csock->socket,(void *)buffer, length, read_flags); #else + /* Windows doesn't support MSG_DONTWAIT, so we need to set + socket to non blocking */ + pvio_socket_blocking(pvio, 0, 0); r= recv(csock->socket, (char *)buffer, (int)length, 0); #endif return r; @@ -359,12 +362,16 @@ size_t pvio_socket_async_write(MARIADB_PVIO *pvio, const uchar *buffer, size_t l #ifndef WIN32 r= send(csock->socket, buffer, length, write_flags); #else + /* Windows doesn't support MSG_DONTWAIT, so we need to set + socket to non blocking */ + pvio_socket_blocking(pvio, 0, 0); r= send(csock->socket, buffer, (int)length, 0); #endif return r; } /* }}} */ + /* {{{ pvio_socket_write */ /* write to socket diff --git a/unittest/libmariadb/async.c b/unittest/libmariadb/async.c index 95e1b258..a0c3381b 100644 --- a/unittest/libmariadb/async.c +++ b/unittest/libmariadb/async.c @@ -118,7 +118,7 @@ wait_for_mysql(MYSQL *mysql, int status) static int async1(MYSQL *my) { - int err, rc; + int err= 0, rc; MYSQL mysql, *ret; MYSQL_RES *res; MYSQL_ROW row;