1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-07 02:42:49 +03:00

Fixed vio for non-blocking API calls

This commit is contained in:
Georg Richter
2014-11-12 18:10:22 +01:00
parent b448d0313f
commit 088fdacfc1
13 changed files with 365 additions and 145 deletions

View File

@@ -87,7 +87,8 @@ size_t vio_write(Vio* vio, const gptr buf, size_t size);
* Whenever the socket is set to blocking mode or not. * Whenever the socket is set to blocking mode or not.
*/ */
int vio_blocking( Vio* vio, int vio_blocking( Vio* vio,
my_bool onoff); my_bool onoff,
my_bool *prevmode);
my_bool vio_is_blocking( Vio* vio); my_bool vio_is_blocking( Vio* vio);
/* /*
* setsockopt TCP_NODELAY at IPPROTO_TCP level, when possible. * setsockopt TCP_NODELAY at IPPROTO_TCP level, when possible.
@@ -135,6 +136,7 @@ void vio_in_addr(Vio *vio, struct in_addr *in);
/* Return 1 if there is data to be read */ /* Return 1 if there is data to be read */
my_bool vio_poll_read(Vio *vio,uint timeout); my_bool vio_poll_read(Vio *vio,uint timeout);
int vio_wait_or_timeout(Vio *vio, my_bool is_read, int timeout);
struct st_vio struct st_vio
@@ -146,7 +148,8 @@ struct st_vio
struct sockaddr_in local; /* Local internet address */ struct sockaddr_in local; /* Local internet address */
struct sockaddr_in remote; /* Remote internet address */ struct sockaddr_in remote; /* Remote internet address */
struct mysql_async_context *async_context; /* For non-blocking API */ struct mysql_async_context *async_context; /* For non-blocking API */
int write_timeout;
int read_timeout;
enum enum_vio_type type; /* Type of connection */ enum enum_vio_type type; /* Type of connection */
char desc[30]; /* String description */ char desc[30]; /* String description */
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL

View File

@@ -211,7 +211,7 @@ static int connect2(my_socket s, const struct sockaddr *name, size_t namelen,
poll_fd.fd= s; poll_fd.fd= s;
/* connection timeout in milliseconds */ /* connection timeout in milliseconds */
res= poll(&poll_fd, 1, (timeout > -1) ? timeout * 1000 : timeout); res= poll(&poll_fd, 1, timeout);
switch(res) switch(res)
{ {
@@ -248,6 +248,24 @@ static int connect2(my_socket s, const struct sockaddr *name, size_t namelen,
return (0); /* ok */ return (0); /* ok */
} }
static int
connect_sync_or_async(MYSQL *mysql, NET *net, my_socket fd,
const struct sockaddr *name, uint namelen)
{
int vio_timeout= (mysql->options.connect_timeout >= 0) ?
mysql->options.connect_timeout * 1000 : -1;
if (mysql->options.extension && mysql->options.extension->async_context &&
mysql->options.extension->async_context->active)
{
my_bool old_mode;
vio_blocking(net->vio, FALSE, &old_mode);
return my_connect_async(mysql->options.extension->async_context, fd,
name, namelen, vio_timeout);
}
return connect2(fd, name, namelen, vio_timeout);
}
/* /*
** Create a named pipe connection ** Create a named pipe connection
*/ */
@@ -291,7 +309,7 @@ HANDLE create_named_pipe(NET *net, uint connect_timeout, char **arg_host,
return INVALID_HANDLE_VALUE; return INVALID_HANDLE_VALUE;
} }
/* wait for for an other instance */ /* wait for for an other instance */
if (! WaitNamedPipe(szPipeName, connect_timeout*1000) ) if (! WaitNamedPipe(szPipeName, connect_timeout) )
{ {
net->last_errno=CR_NAMEDPIPEWAIT_ERROR; net->last_errno=CR_NAMEDPIPEWAIT_ERROR;
sprintf(net->last_error,ER(net->last_errno),host, unix_socket, sprintf(net->last_error,ER(net->last_errno),host, unix_socket,
@@ -1577,10 +1595,9 @@ MYSQL *mthd_my_real_connect(MYSQL *mysql, const char *host, const char *user,
bzero((char*) &UNIXaddr,sizeof(UNIXaddr)); bzero((char*) &UNIXaddr,sizeof(UNIXaddr));
UNIXaddr.sun_family = AF_UNIX; UNIXaddr.sun_family = AF_UNIX;
strmov(UNIXaddr.sun_path, unix_socket); strmov(UNIXaddr.sun_path, unix_socket);
if (connect2(sock,(struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr), if (connect_sync_or_async(mysql, net, sock,
mysql->options.connect_timeout) <0) (struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr)))
{ {
printf("err\n");
DBUG_PRINT("error",("Got error %d on connect to local server",socket_errno)); DBUG_PRINT("error",("Got error %d on connect to local server",socket_errno));
my_set_error(mysql, CR_CONNECTION_ERROR, SQLSTATE_UNKNOWN, ER(CR_CONNECTION_ERROR), my_set_error(mysql, CR_CONNECTION_ERROR, SQLSTATE_UNKNOWN, ER(CR_CONNECTION_ERROR),
unix_socket, socket_errno); unix_socket, socket_errno);
@@ -1675,17 +1692,20 @@ MYSQL *mthd_my_real_connect(MYSQL *mysql, const char *host, const char *user,
freeaddrinfo(res); freeaddrinfo(res);
goto error; goto error;
} }
if (!(rc= connect2(sock, save_res->ai_addr, save_res->ai_addrlen, rc= connect_sync_or_async(mysql, net, sock,
mysql->options.connect_timeout))) save_res->ai_addr, save_res->ai_addrlen);
if (!rc)
{ {
if (socket_block(sock, 1) == SOCKET_ERROR) if (mysql->options.extension && mysql->options.extension->async_context &&
mysql->options.extension->async_context->active)
break;
else if (socket_block(sock, 1) == SOCKET_ERROR)
{ {
closesocket(sock); closesocket(sock);
continue; continue;
} }
break; /* success! */ break; /* success! */
} }
vio_delete(mysql->net.vio); vio_delete(mysql->net.vio);
mysql->net.vio= NULL; mysql->net.vio= NULL;
} }
@@ -1708,6 +1728,13 @@ MYSQL *mthd_my_real_connect(MYSQL *mysql, const char *host, const char *user,
} }
} }
/* set timeouts */
net->vio->read_timeout= mysql->options.read_timeout;
net->vio->write_timeout= mysql->options.write_timeout;
if (mysql->options.extension && mysql->options.extension->async_context)
net->vio->async_context= mysql->options.extension->async_context;
if (!net->vio || my_net_init(net, net->vio)) if (!net->vio || my_net_init(net, net->vio))
{ {
vio_delete(net->vio); vio_delete(net->vio);
@@ -1720,16 +1747,16 @@ MYSQL *mthd_my_real_connect(MYSQL *mysql, const char *host, const char *user,
/* set read timeout */ /* set read timeout */
if (mysql->options.read_timeout) if (mysql->options.read_timeout >= 0)
vio_read_timeout(net->vio, mysql->options.read_timeout); vio_read_timeout(net->vio, mysql->options.read_timeout);
/* set write timeout */ /* set write timeout */
if (mysql->options.write_timeout) if (mysql->options.write_timeout >= 0)
vio_write_timeout(net->vio, mysql->options.read_timeout); vio_write_timeout(net->vio, mysql->options.read_timeout);
/* Get version info */ /* Get version info */
mysql->protocol_version= PROTOCOL_VERSION; /* Assume this */ mysql->protocol_version= PROTOCOL_VERSION; /* Assume this */
if (mysql->options.connect_timeout && if (mysql->options.connect_timeout >= 0 &&
vio_poll_read(net->vio, mysql->options.connect_timeout)) vio_wait_or_timeout(net->vio, FALSE, mysql->options.connect_timeout * 1000) < 1)
{ {
my_set_error(mysql, CR_SERVER_LOST, SQLSTATE_UNKNOWN, my_set_error(mysql, CR_SERVER_LOST, SQLSTATE_UNKNOWN,
ER(CR_SERVER_LOST_EXTENDED), ER(CR_SERVER_LOST_EXTENDED),

View File

@@ -109,7 +109,6 @@ EXPORTS
mysql_ping mysql_ping
mysql_ping_cont mysql_ping_cont
mysql_ping_start mysql_ping_start
mysql_plugin_options
mysql_ps_fetch_functions DATA mysql_ps_fetch_functions DATA
mysql_query mysql_query
mysql_query_cont mysql_query_cont

View File

@@ -25,6 +25,8 @@
#include <ma_secure.h> #include <ma_secure.h>
#include <errmsg.h> #include <errmsg.h>
#include <violite.h> #include <violite.h>
#include <mysql_async.h>
#include <my_context.h>
static my_bool my_ssl_initialized= FALSE; static my_bool my_ssl_initialized= FALSE;
static SSL_CTX *SSL_context= NULL; static SSL_CTX *SSL_context= NULL;
@@ -395,7 +397,7 @@ int my_ssl_connect(SSL *ssl)
/* Set socket to blocking if not already set */ /* Set socket to blocking if not already set */
if (!(blocking= vio_is_blocking(mysql->net.vio))) if (!(blocking= vio_is_blocking(mysql->net.vio)))
vio_blocking(mysql->net.vio, TRUE); vio_blocking(mysql->net.vio, TRUE, 0);
SSL_clear(ssl); SSL_clear(ssl);
SSL_SESSION_set_timeout(SSL_get_session(ssl), SSL_SESSION_set_timeout(SSL_get_session(ssl),
@@ -407,7 +409,7 @@ int my_ssl_connect(SSL *ssl)
my_SSL_error(mysql); my_SSL_error(mysql);
/* restore blocking mode */ /* restore blocking mode */
if (!blocking) if (!blocking)
vio_blocking(mysql->net.vio, FALSE); vio_blocking(mysql->net.vio, FALSE, 0);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
@@ -489,7 +491,10 @@ size_t my_ssl_write(Vio *vio, const uchar* buf, size_t size)
{ {
size_t written; size_t written;
DBUG_ENTER("my_ssl_write"); DBUG_ENTER("my_ssl_write");
if (vio->async_context && vio->async_context->active)
written= my_ssl_write_async(vio->async_context, (SSL *)vio->ssl, buf,
size);
else
written= SSL_write((SSL*) vio->ssl, buf, size); written= SSL_write((SSL*) vio->ssl, buf, size);
DBUG_RETURN(written); DBUG_RETURN(written);
} }
@@ -511,6 +516,9 @@ size_t my_ssl_read(Vio *vio, uchar* buf, size_t size)
size_t read; size_t read;
DBUG_ENTER("my_ssl_read"); DBUG_ENTER("my_ssl_read");
if (vio->async_context && vio->async_context->active)
read= my_ssl_read_async(vio->async_context, (SSL *)vio->ssl, buf, size);
else
read= SSL_read((SSL*) vio->ssl, buf, size); read= SSL_read((SSL*) vio->ssl, buf, size);
DBUG_RETURN(read); DBUG_RETURN(read);
} }

View File

@@ -41,12 +41,14 @@
*/ */
#define WIN_SET_NONBLOCKING(mysql) { \ #define WIN_SET_NONBLOCKING(mysql) { \
my_bool old_mode; \ my_bool old_mode; \
if ((mysql)->net.vio) vio_blocking((mysql)->net.vio, FALSE); \ if ((mysql)->net.vio) vio_blocking((mysql)->net.vio, FALSE, &old_mode); \
} }
#else #else
#define WIN_SET_NONBLOCKING(mysql) #define WIN_SET_NONBLOCKING(mysql)
#endif #endif
extern void mysql_close_slow_part(MYSQL *mysql);
void void
my_context_install_suspend_resume_hook(struct mysql_async_context *b, my_context_install_suspend_resume_hook(struct mysql_async_context *b,

View File

@@ -132,7 +132,7 @@ int my_net_init(NET *net, Vio* vio)
net->fd = vio_fd(vio); /* For perl DBI/DBD */ net->fd = vio_fd(vio); /* For perl DBI/DBD */
#if defined(MYSQL_SERVER) && !defined(__WIN32) && !defined(__EMX__) && !defined(OS2) #if defined(MYSQL_SERVER) && !defined(__WIN32) && !defined(__EMX__) && !defined(OS2)
if (!(test_flags & TEST_BLOCKING)) if (!(test_flags & TEST_BLOCKING))
vio_blocking(vio, FALSE); vio_blocking(vio, FALSE, 0);
#endif #endif
vio_fastsend(vio); vio_fastsend(vio);
} }
@@ -441,7 +441,7 @@ net_real_write(NET *net,const char *packet,size_t len)
{ /* Always true for client */ { /* Always true for client */
if (!vio_is_blocking(net->vio)) if (!vio_is_blocking(net->vio))
{ {
while (vio_blocking(net->vio, TRUE) < 0) while (vio_blocking(net->vio, TRUE, 0) < 0)
{ {
if (vio_should_retry(net->vio) && retry_count++ < RETRY_COUNT) if (vio_should_retry(net->vio) && retry_count++ < RETRY_COUNT)
continue; continue;
@@ -497,7 +497,7 @@ net_real_write(NET *net,const char *packet,size_t len)
if (thr_alarm_in_use(&alarmed)) if (thr_alarm_in_use(&alarmed))
{ {
thr_end_alarm(&alarmed); thr_end_alarm(&alarmed);
vio_blocking(net->vio, net_blocking); vio_blocking(net->vio, net_blocking, 0);
} }
net->reading_or_writing=0; net->reading_or_writing=0;
DBUG_RETURN(((int) (pos != end))); DBUG_RETURN(((int) (pos != end)));
@@ -522,7 +522,7 @@ static void my_net_skip_rest(NET *net, ulong remain, thr_alarm_t *alarmed,
if (!thr_alarm_in_use(alarmed)) if (!thr_alarm_in_use(alarmed))
{ {
if (thr_alarm(alarmed,net->timeout,alarm_buff) || if (thr_alarm(alarmed,net->timeout,alarm_buff) ||
(!vio_is_blocking(net->vio) && vio_blocking(net->vio,TRUE) < 0)) (!vio_is_blocking(net->vio) && vio_blocking(net->vio,TRUE, 0) < 0))
return; /* Can't setup, abort */ return; /* Can't setup, abort */
} }
while (remain > 0) while (remain > 0)
@@ -592,7 +592,7 @@ my_real_read(NET *net, size_t *complen)
{ {
if (!vio_is_blocking(net->vio)) if (!vio_is_blocking(net->vio))
{ {
while (vio_blocking(net->vio,TRUE) < 0) while (vio_blocking(net->vio,TRUE, 0) < 0)
{ {
if (vio_should_retry(net->vio) && if (vio_should_retry(net->vio) &&
retry_count++ < RETRY_COUNT) retry_count++ < RETRY_COUNT)
@@ -702,7 +702,7 @@ end:
if (thr_alarm_in_use(&alarmed)) if (thr_alarm_in_use(&alarmed))
{ {
thr_end_alarm(&alarmed); thr_end_alarm(&alarmed);
vio_blocking(net->vio, net_blocking); vio_blocking(net->vio, net_blocking, 0);
} }
net->reading_or_writing=0; net->reading_or_writing=0;
return(len); return(len);

View File

@@ -19,7 +19,7 @@
#include <my_global.h> #include <my_global.h>
#include <m_string.h> #include <m_string.h>
#if !defined(HAVE_STRTOLL) && defined(HAVE_LONG_LONG) #if !defined(_WIN32) && !defined(HAVE_STRTOLL) && defined(HAVE_LONG_LONG)
#define USE_LONGLONG #define USE_LONGLONG
#define strtoll glob_strtoll /* Fix for True64 */ #define strtoll glob_strtoll /* Fix for True64 */

View File

@@ -19,7 +19,7 @@
#include <my_global.h> #include <my_global.h>
#include <m_string.h> #include <m_string.h>
#if !defined(HAVE_STRTOULL) && defined(HAVE_LONG_LONG) #if !defined(_WIN32) && !defined(HAVE_STRTOULL) && defined(HAVE_LONG_LONG)
#define USE_UNSIGNED #define USE_UNSIGNED
#define USE_LONGLONG #define USE_LONGLONG
#include "strto.c" #include "strto.c"

View File

@@ -69,6 +69,14 @@
#define SOCKET_EWOULDBLOCK SOCKET_EAGAIN #define SOCKET_EWOULDBLOCK SOCKET_EAGAIN
#endif #endif
#include <mysql_async.h>
#include <my_context.h>
#ifdef _WIN32
#define ma_get_error() WSAGetLastError()
#else
#define ma_get_error() errno
#endif
typedef void *vio_ptr; typedef void *vio_ptr;
typedef char *vio_cstring; typedef char *vio_cstring;
@@ -82,6 +90,7 @@ void vio_reset(Vio* vio, enum enum_vio_type type,
my_bool localhost) my_bool localhost)
{ {
uchar *save_cache= vio->cache; uchar *save_cache= vio->cache;
int save_timeouts[2]= {vio->read_timeout, vio->write_timeout};
bzero((char*) vio, sizeof(*vio)); bzero((char*) vio, sizeof(*vio));
vio->type= type; vio->type= type;
vio->sd= sd; vio->sd= sd;
@@ -90,16 +99,18 @@ void vio_reset(Vio* vio, enum enum_vio_type type,
/* do not clear cache */ /* do not clear cache */
vio->cache= vio->cache_pos= save_cache; vio->cache= vio->cache_pos= save_cache;
vio->cache_size= 0; vio->cache_size= 0;
vio->read_timeout= save_timeouts[0];
vio->write_timeout= save_timeouts[1];
} }
void vio_timeout(Vio *vio, int type, uint seconds) void vio_timeout(Vio *vio, int type, uint timeval)
{ {
#ifdef _WIN32 #ifdef _WIN32
uint timeout= seconds * 1000; /* milli secs */ uint timeout= timeval; /* milli secs */
#else #else
struct timeval timeout; struct timeval timeout;
timeout.tv_sec= seconds; timeout.tv_sec= timeval;
timeout.tv_usec= 0; timeout.tv_usec= (timeval % 1000) * 1000;
#endif #endif
if (setsockopt(vio->sd, SOL_SOCKET, type, if (setsockopt(vio->sd, SOL_SOCKET, type,
@@ -114,14 +125,16 @@ void vio_timeout(Vio *vio, int type, uint seconds)
} }
} }
void vio_read_timeout(Vio *vio, uint seconds) void vio_read_timeout(Vio *vio, uint timeout)
{ {
vio_timeout(vio, SO_RCVTIMEO, seconds); vio->read_timeout= (timeout >= 0) ? timeout * 1000 : -1;
vio_timeout(vio, SO_RCVTIMEO, vio->read_timeout);
} }
void vio_write_timeout(Vio *vio, uint seconds) void vio_write_timeout(Vio *vio, uint timeout)
{ {
vio_timeout(vio, SO_SNDTIMEO, seconds); vio->write_timeout= (timeout >= 0) ? timeout * 1000 : -1;
vio_timeout(vio, SO_SNDTIMEO, vio->write_timeout);
} }
/* Open the socket or TCP/IP connection and read the fnctl() status */ /* Open the socket or TCP/IP connection and read the fnctl() status */
@@ -197,8 +210,75 @@ int vio_errno(Vio *vio __attribute__((unused)))
return socket_errno; /* On Win32 this mapped to WSAGetLastError() */ return socket_errno; /* On Win32 this mapped to WSAGetLastError() */
} }
int vio_wait_or_timeout(Vio *vio, my_bool is_read, int timeout)
{
int rc;
#ifndef _WIN32
struct pollfd p_fd;
#else
struct timeval tv= {0,0};
fd_set fds, exc_fds;
#endif
/* we don't support it via named pipes yet.
* maybe this could be handled via PeekNamedPipe somehow !? */
if (vio->type == VIO_TYPE_NAMEDPIPE)
return 1;
/*
Note that if zero timeout, then we will not block, so we do not need to
yield to calling application in the async case.
*/
if (timeout != 0 && vio->async_context && vio->async_context->active)
{
rc= my_io_wait_async(vio->async_context,
(is_read) ? VIO_IO_EVENT_READ : VIO_IO_EVENT_WRITE,
timeout);
return(rc);
}
else
{
#ifndef _WIN32
p_fd.fd= vio->sd;
p_fd.events= (is_read) ? POLLIN : POLLOUT;
do {
rc= poll(&p_fd, 1, timeout);
} while (rc == -1 || errno == EINTR);
if (rc == 0)
errno= ETIMEDOUT;
#else
FD_ZERO(&fds);
FD_ZERO(&exc_fds);
FD_SET(vio->sd, &fds);
FD_SET(vio->sd, &exc_fds);
if (timeout >= 0)
{
tv.tv_sec= timeout / 1000;
tv.tv_usec= (timeout % 1000) * 1000;
}
rc= select(0, (is_read) ? &fds : NULL,
(is_read) ? NULL : &fds,
&exc_fds,
(timeout >= 0) ? &tv : NULL);
if (rc == SOCKET_ERROR)
errno= WSAGetLastError();
if (rc == 0)
errno= ETIMEDOUT;
#endif
}
return rc;
}
size_t vio_real_read(Vio *vio, gptr buf, size_t size) size_t vio_real_read(Vio *vio, gptr buf, size_t size)
{ {
size_t r;
switch(vio->type) { switch(vio->type) {
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
case VIO_TYPE_SSL: case VIO_TYPE_SSL:
@@ -216,13 +296,56 @@ size_t vio_real_read(Vio *vio, gptr buf, size_t size)
break; break;
#endif #endif
default: default:
return recv(vio->sd, buf, if (vio->async_context && vio->async_context->active)
#ifdef _WIN32 r= my_recv_async(vio->async_context,
(int) vio->sd,
buf, size, vio->read_timeout);
else
{
if (vio->async_context)
{
/*
If switching from non-blocking to blocking API usage, set the socket
back to blocking mode.
*/
my_bool old_mode;
vio_blocking(vio, TRUE, &old_mode);
}
#ifndef _WIN32
do {
r= read(vio->sd, buf, size);
} while (r == -1 && errno == EINTR);
while (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)
&& vio->read_timeout > 0)
{
if (vio_wait_or_timeout(vio, TRUE, vio->write_timeout) < 1)
return 0;
do {
r= read(vio->sd, buf, size);
} while (r == -1 && errno == EINTR);
}
#else
{
WSABUF wsaData;
DWORD dwBytes = 0;
DWORD flags = 0;
wsaData.len= size;
wsaData.buf= buf;
if (WSARecv(vio->sd, &wsaData, 1, &dwBytes, &flags, NULL, NULL) == SOCKET_ERROR)
{
errno= WSAGetLastError();
return 0;
}
r= (size_t)dwBytes;
}
#endif #endif
size, 0); }
break; break;
} }
return r;
} }
@@ -283,7 +406,7 @@ my_bool vio_read_peek(Vio *vio, size_t *bytes)
char buffer[1024]; char buffer[1024];
ssize_t length; ssize_t length;
vio_blocking(vio, 0); vio_blocking(vio, 0, 0);
length= recv(vio->sd, &buffer, sizeof(buffer), MSG_PEEK); length= recv(vio->sd, &buffer, sizeof(buffer), MSG_PEEK);
if (length < 0) if (length < 0)
return TRUE; return TRUE;
@@ -305,23 +428,60 @@ size_t vio_write(Vio * vio, const gptr buf, size_t size)
DBUG_RETURN(r); DBUG_RETURN(r);
} }
#endif #endif
#if defined( _WIN32) || defined(OS2) #ifdef _WIN32
if ( vio->type == VIO_TYPE_NAMEDPIPE) if ( vio->type == VIO_TYPE_NAMEDPIPE)
{ {
DWORD length; DWORD length;
#ifdef OS2
if (!DosWrite((HFILE)vio->hPipe, (char*) buf, size, &length))
DBUG_RETURN(-1);
#else
if (!WriteFile(vio->hPipe, (char*) buf, (DWORD)size, &length, NULL)) if (!WriteFile(vio->hPipe, (char*) buf, (DWORD)size, &length, NULL))
DBUG_RETURN(-1); DBUG_RETURN(-1);
#endif
DBUG_RETURN(length); DBUG_RETURN(length);
} }
r = send(vio->sd, buf, (int)size,0); #endif
if (vio->async_context && vio->async_context->active)
r= my_send_async(vio->async_context, vio->sd, buf, size,
vio->write_timeout);
else
{
if (vio->async_context)
{
/*
If switching from non-blocking to blocking API usage, set the socket
back to blocking mode.
*/
my_bool old_mode;
vio_blocking(vio, TRUE, &old_mode);
}
#ifndef _WIN32
do {
r= send(vio->sd, buf, size, vio->write_timeout ? MSG_DONTWAIT : MSG_WAITALL);
} while (r == -1 && errno == EINTR);
while (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK) &&
vio->write_timeout > 0)
{
if (vio_wait_or_timeout(vio, FALSE, vio->write_timeout) < 1)
return 0;
do {
r= send(vio->sd, buf, size, vio->write_timeout ? MSG_DONTWAIT : MSG_WAITALL);
} while (r == -1 && errno == EINTR);
}
#else #else
r = write(vio->sd, buf, size); {
#endif /* _WIN32 */ WSABUF wsaData;
DWORD dwBytes = 0;
wsaData.len= size;
wsaData.buf= (char *)buf;
if (WSASend(vio->sd, &wsaData, 1, &dwBytes, 0, NULL, NULL) == SOCKET_ERROR)
{
errno= WSAGetLastError();
DBUG_RETURN(0);
}
r= (size_t)dwBytes;
}
#endif
}
#ifndef DBUG_OFF #ifndef DBUG_OFF
if ((size_t)r == -1) if ((size_t)r == -1)
{ {
@@ -333,48 +493,50 @@ size_t vio_write(Vio * vio, const gptr buf, size_t size)
} }
int vio_blocking(Vio * vio, my_bool set_blocking_mode) int vio_blocking(Vio *vio, my_bool block, my_bool *previous_mode)
{ {
int r=0; int *sd_flags= &vio->fcntl_mode;
DBUG_ENTER("vio_blocking"); int save_flags= vio->fcntl_mode;
DBUG_PRINT("enter", ("set_blocking_mode: %d", (int) set_blocking_mode)); my_bool tmp;
my_socket sock= vio->sd;
#if !defined(__WIN32) && !defined(__EMX__) && !defined(OS2) if (vio->type == VIO_TYPE_NAMEDPIPE)
#if !defined(NO_FCNTL_NONBLOCK) return 0;
if (vio->sd >= 0) if (!previous_mode)
previous_mode= &tmp;
#ifdef _WIN32
*previous_mode= (*sd_flags & O_NONBLOCK) != 0;
*sd_flags = (block) ? *sd_flags & ~O_NONBLOCK : *sd_flags | O_NONBLOCK;
{ {
int old_fcntl=vio->fcntl_mode; ulong arg= 1 - block;
if (set_blocking_mode) if (ioctlsocket(sock, FIONBIO, (void *)&arg))
vio->fcntl_mode &= ~O_NONBLOCK; /* clear bit */ {
else vio->fcntl_mode= save_flags;
vio->fcntl_mode |= O_NONBLOCK; /* set bit */ return(WSAGetLastError());
if (old_fcntl != vio->fcntl_mode)
r = fcntl(vio->sd, F_SETFL, vio->fcntl_mode);
} }
#endif /* !defined(NO_FCNTL_NONBLOCK) */ }
#else /* !defined(_WIN32) && !defined(__EMX__) */ #else
#ifndef __EMX__ #if defined(O_NONBLOCK)
if (vio->type != VIO_TYPE_NAMEDPIPE) *previous_mode= (*sd_flags & O_NONBLOCK) != 0;
*sd_flags = (block) ? *sd_flags & ~O_NONBLOCK : *sd_flags | O_NONBLOCK;
#elif defined(O_NDELAY)
*previous_mode= (*sd_flags & O_NODELAY) != 0;
*sd_flags = (block) ? *sd_flags & ~O_NODELAY : *sd_flags | O_NODELAY;
#elif defined(FNDELAY)
*previous_mode= (*sd_flags & O_FNDELAY) != 0;
*sd_flags = (block) ? *sd_flags & ~O_FNDELAY : *sd_flags | O_FNDELAY;
#else
#error socket blocking is not supported on this platform
#endif #endif
if (fcntl(sock, F_SETFL, *sd_flags) == -1)
{ {
ulong arg; vio->fcntl_mode= save_flags;
int old_fcntl=vio->fcntl_mode; return errno;
if (set_blocking_mode)
{
arg = 0;
vio->fcntl_mode &= ~O_NONBLOCK; /* clear bit */
} }
else #endif
{ return 0;
arg = 1;
vio->fcntl_mode |= O_NONBLOCK; /* set bit */
}
if (old_fcntl != vio->fcntl_mode)
r = ioctlsocket(vio->sd,FIONBIO,(void*) &arg);
}
#endif /* !defined(_WIN32) && !defined(__EMX__) */
DBUG_RETURN(r);
} }
my_bool my_bool
@@ -527,7 +689,7 @@ void vio_in_addr(Vio *vio, struct in_addr *in)
/* Return 0 if there is data to be read */ /* Return 0 if there is data to be read */
/*
my_bool vio_poll_read(Vio *vio,uint timeout) my_bool vio_poll_read(Vio *vio,uint timeout)
{ {
#ifndef HAVE_POLL #ifndef HAVE_POLL
@@ -541,10 +703,11 @@ my_bool vio_poll_read(Vio *vio,uint timeout)
fds.revents=0; fds.revents=0;
if ((res=poll(&fds,1,(int) timeout*1000)) <= 0) if ((res=poll(&fds,1,(int) timeout*1000)) <= 0)
{ {
DBUG_RETURN(res < 0 ? 0 : 1); /* Don't return 1 on errors */ DBUG_RETURN(res < 0 ? 0 : 1);
} }
DBUG_RETURN(fds.revents & POLLIN ? 0 : 1); DBUG_RETURN(fds.revents & POLLIN ? 0 : 1);
#endif #endif
} }
*/
#endif /* HAVE_VIO */ #endif /* HAVE_VIO */

View File

@@ -79,7 +79,7 @@ wait_for_mysql(MYSQL *mysql, int status)
#else #else
struct pollfd pfd; struct pollfd pfd;
int timeout; int timeout;
int res; int res= -1;
pfd.fd= mysql_get_socket(mysql); pfd.fd= mysql_get_socket(mysql);
pfd.events= pfd.events=
@@ -87,10 +87,12 @@ wait_for_mysql(MYSQL *mysql, int status)
(status & MYSQL_WAIT_WRITE ? POLLOUT : 0) | (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) |
(status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0);
if (status & MYSQL_WAIT_TIMEOUT) if (status & MYSQL_WAIT_TIMEOUT)
timeout= 1000*mysql_get_timeout_value(mysql); timeout= mysql_get_timeout_value_ms(mysql);
else else
timeout= -1; timeout= -1;
do {
res= poll(&pfd, 1, timeout); res= poll(&pfd, 1, timeout);
} while (res == -1 && errno == EINTR);
if (res == 0) if (res == 0)
return MYSQL_WAIT_TIMEOUT; return MYSQL_WAIT_TIMEOUT;
else if (res < 0) else if (res < 0)
@@ -121,9 +123,20 @@ static int async1(MYSQL *my)
MYSQL_RES *res; MYSQL_RES *res;
MYSQL_ROW row; MYSQL_ROW row;
int status; int status;
uint default_timeout;
int i;
for (i=0; i < 100; i++)
{
mysql_init(&mysql); mysql_init(&mysql);
mysql_options(&mysql, MYSQL_OPT_NONBLOCK, 0); mysql_options(&mysql, MYSQL_OPT_NONBLOCK, 0);
/* set timeouts to 300 microseconds */
default_timeout= 300;
mysql_options(&mysql, MYSQL_OPT_READ_TIMEOUT, &default_timeout);
mysql_options(&mysql, MYSQL_OPT_CONNECT_TIMEOUT, &default_timeout);
mysql_options(&mysql, MYSQL_OPT_WRITE_TIMEOUT, &default_timeout);
mysql_options(&mysql, MYSQL_READ_DEFAULT_GROUP, "myapp"); mysql_options(&mysql, MYSQL_READ_DEFAULT_GROUP, "myapp");
/* Returns 0 when done, else flag for what to wait for when need to block. */ /* Returns 0 when done, else flag for what to wait for when need to block. */
@@ -134,7 +147,6 @@ static int async1(MYSQL *my)
status= wait_for_mysql(&mysql, status); status= wait_for_mysql(&mysql, status);
status= mysql_real_connect_cont(&ret, &mysql, status); status= mysql_real_connect_cont(&ret, &mysql, status);
} }
FAIL_IF(!ret, "Failed to mysql_real_connect()"); FAIL_IF(!ret, "Failed to mysql_real_connect()");
status= mysql_real_query_start(&err, &mysql, SL("SHOW STATUS")); status= mysql_real_query_start(&err, &mysql, SL("SHOW STATUS"));
@@ -159,7 +171,6 @@ static int async1(MYSQL *my)
} }
if (!row) if (!row)
break; break;
diag("%s: %s", row[0], row[1]);
} }
FAIL_IF(mysql_errno(&mysql), "Got error while retrieving rows"); FAIL_IF(mysql_errno(&mysql), "Got error while retrieving rows");
mysql_free_result(res); mysql_free_result(res);
@@ -176,6 +187,7 @@ static int async1(MYSQL *my)
status= wait_for_mysql(&mysql, status); status= wait_for_mysql(&mysql, status);
status= mysql_close_cont(&mysql, status); status= mysql_close_cont(&mysql, status);
} }
}
return OK; return OK;
} }

View File

@@ -142,7 +142,7 @@ static int test_conc70(MYSQL *my)
rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1"); rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
check_mysql_rc(rc, mysql); check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "CREATE TABLE t1 (a LONGBLOB)"); rc= mysql_query(mysql, "CREATE TABLE t1 (a LONGBLOB) engine=MyISAM");
check_mysql_rc(rc, mysql); check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "INSERT INTO t1 VALUES (REPEAT('A', 1024 * 1024 * 20))"); rc= mysql_query(mysql, "INSERT INTO t1 VALUES (REPEAT('A', 1024 * 1024 * 20))");
@@ -188,7 +188,7 @@ static int test_conc68(MYSQL *my)
rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1"); rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
check_mysql_rc(rc, mysql); check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "CREATE TABLE t1 (a LONGBLOB)"); rc= mysql_query(mysql, "CREATE TABLE t1 (a LONGBLOB) ENGINE=MyISAM");
check_mysql_rc(rc, mysql); check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "INSERT INTO t1 VALUES (REPEAT('A', 1024 * 1024 * 20))"); rc= mysql_query(mysql, "INSERT INTO t1 VALUES (REPEAT('A', 1024 * 1024 * 20))");

View File

@@ -12,6 +12,4 @@ openssl req -x509 -newkey rsa:1024 \
openssl rsa -in client-key-enc.pem -out client-key.pem \ openssl rsa -in client-key-enc.pem -out client-key.pem \
-passin pass:qwerty -passout pass: -passin pass:qwerty -passout pass:
cat server-cert.pem client-cert.pem > ca.pem cat server-cert.pem client-cert.pem > ca-cert.pem
cat client-key.pem client-cert.pem ca.pem > combined.pem

View File

@@ -922,6 +922,14 @@ static int test_connect_attrs(MYSQL *my)
} }
result= mysql_store_result(my); result= mysql_store_result(my);
/* MariaDB Connector/C already sent connection attrs after handshake. So if the table is
empty, it indicates that the performance schema is disabled */
if (!mysql_num_rows(result))
{
diag("skip: performance_schema not enabled");
mysql_free_result(result);
return SKIP;
}
mysql_free_result(result); mysql_free_result(result);
mysql= mysql_init(NULL); mysql= mysql_init(NULL);