1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-29 00:08:14 +03:00
Files
mariadb/extra/libevent/evbuffer.c
Michael Widenius 4fe3425009 Added "pool-of-threads" handling (with libevent)
This is a backport of code from MySQL 6.0 with cleanups and extensions

The following new options are supported
configure options:
  --with-libevent                  ; Enable use of libevent, which is needed for pool of threads

mysqld options:
--thread-handling=pool-of-threads  ; Use a pool of threads to handle queries
--thread-pool-size=#               ; Define how many threads should be created to handle all queries
--extra-port=#                     ; Extra tcp port that uses the old one-thread-per-connection method
--extra-max-connections=#          ; Number of connections to accept to 'extra-port'
--test-ignore-wrong-options        ; Ignore setting an enum value to a wrong option (for mysql-test-run)



BUILD/SETUP.sh:
  Added libevents (and thus pool-of-threads) to max builds
CMakeLists.txt:
  Added libevent
Makefile.am:
  Added libevents
config/ac-macros/libevent.m4:
  Libevent code for configure
config/ac-macros/libevent_configure.m4:
  Libevent code for configure
configure.in:
  Added libevents
dbug/dbug.c:
  Added _db_is_pushed(); Needed for pool-of-threads code
extra/Makefile.am:
  Added libevents
extra/libevent:
  Libevent initial code
extra/libevent/CMakeLists.txt:
  Libevent initial code
extra/libevent/Makefile.am:
  Libevent initial code
extra/libevent/README:
  Libevent initial code
extra/libevent/WIN32-Code:
  Libevent initial code
extra/libevent/WIN32-Code/config.h:
  Libevent initial code
extra/libevent/WIN32-Code/misc.c:
  Libevent initial code
extra/libevent/WIN32-Code/misc.h:
  Libevent initial code
extra/libevent/WIN32-Code/tree.h:
  Libevent initial code
extra/libevent/WIN32-Code/win32.c:
  Libevent initial code
extra/libevent/buffer.c:
  Libevent initial code
extra/libevent/compat:
  Libevent initial code
extra/libevent/compat/sys:
  Libevent initial code
extra/libevent/compat/sys/_time.h:
  Libevent initial code
extra/libevent/compat/sys/queue.h:
  Libevent initial code
extra/libevent/compat/sys/tree.h:
  Libevent initial code
extra/libevent/devpoll.c:
  Libevent initial code
extra/libevent/epoll.c:
  Libevent initial code
extra/libevent/epoll_sub.c:
  Libevent initial code
extra/libevent/evbuffer.c:
  Libevent initial code
extra/libevent/evdns.c:
  Libevent initial code
extra/libevent/evdns.h:
  Libevent initial code
extra/libevent/event-config.h:
  Libevent initial code
extra/libevent/event-internal.h:
  Libevent initial code
extra/libevent/event.c:
  Libevent initial code
extra/libevent/event.h:
  Libevent initial code
extra/libevent/event_tagging.c:
  Libevent initial code
extra/libevent/evhttp.h:
  Libevent initial code
extra/libevent/evport.c:
  Libevent initial code
extra/libevent/evrpc-internal.h:
  Libevent initial code
extra/libevent/evrpc.c:
  Libevent initial code
extra/libevent/evrpc.h:
  Libevent initial code
extra/libevent/evsignal.h:
  Libevent initial code
extra/libevent/evutil.c:
  Libevent initial code
extra/libevent/evutil.h:
  Libevent initial code
extra/libevent/http-internal.h:
  Libevent initial code
extra/libevent/http.c:
  Libevent initial code
extra/libevent/kqueue.c:
  Libevent initial code
extra/libevent/log.c:
  Libevent initial code
extra/libevent/log.h:
  Libevent initial code
extra/libevent/min_heap.h:
  Libevent initial code
extra/libevent/poll.c:
  Libevent initial code
extra/libevent/select.c:
  Libevent initial code
extra/libevent/signal.c:
  Libevent initial code
extra/libevent/strlcpy-internal.h:
  Libevent initial code
extra/libevent/strlcpy.c:
  Libevent initial code
include/config-win.h:
  Libevent support
include/my_dbug.h:
  ADded _db_is_pushed
include/mysql.h.pp:
  Update to handle new prototypes
include/typelib.h:
  Split find_type_or_exit() into two functions
include/violite.h:
  Added vio_is_pending()
libmysqld/Makefile.am:
  Added libevent
mysql-test/include/have_pool_of_threads.inc:
  Added test for pool-of-threads
mysql-test/mysql-test-run.pl:
  Don't abort based on time and don't retry test cases when run under --gdb or --debug
mysql-test/r/crash_commit_before.result:
  USE GLOBAL for debug variable
mysql-test/r/have_pool_of_threads.require:
  Added test for pool-of-threads
mysql-test/r/pool_of_threads.result:
  Added test for pool-of-threads
mysql-test/r/subselect_debug.result:
  USE GLOBAL for debug variable
mysql-test/t/crash_commit_before.test:
  USE GLOBAL for debug variable
mysql-test/t/merge-big.test:
  USE GLOBAL for debug variable
mysql-test/t/pool_of_threads-master.opt:
  Added test for pool-of-threads
mysql-test/t/pool_of_threads.test:
  Added test for pool-of-threads
mysys/typelib.c:
  Split find_type_or_exit() into find_type_with_warning()
sql/Makefile.am:
  Added libevent
sql/handler.cc:
  Indentation fix.
  Fixed memory loss bug
  Fixed crash on exit when handler plugin failed
sql/mysql_priv.h:
  Added extra_max_connections and mysqld_extra_port
  Added extern functions from sql_connect.cc
sql/mysqld.cc:
  Added support for new mysqld options
  Added code for 'extra-port' and 'extra-max-connections'
  Split some functions into smaller pieces to be able to reuse code
  Added code for test-ignore-wrong-options
sql/scheduler.cc:
  Updated schduler code from MySQL 6.0
sql/scheduler.h:
  Updated schduler code from MySQL 6.0
sql/set_var.cc:
  Added support for changing "extra_max_connections"
sql/sql_class.cc:
  Iniitalize thread schduler options in THD
sql/sql_class.h:
  Added to extra_port and scheduler to 'THD'
sql/sql_connect.cc:
  Use thd->schduler to check number of connections and terminate connection
  Made some local functions global (for scheduler.cc)
vio/viosocket.c:
  Added 'vio_pending', needed for scheduler..c
2009-03-13 00:27:35 +02:00

419 lines
9.6 KiB
C

/*
* Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef HAVE_STDARG_H
#include <stdarg.h>
#endif
#ifdef WIN32
#include <winsock2.h>
#endif
#include "evutil.h"
#include "event.h"
/* prototypes */
void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
static int
bufferevent_add(struct event *ev, int timeout)
{
struct timeval tv, *ptv = NULL;
if (timeout) {
evutil_timerclear(&tv);
tv.tv_sec = timeout;
ptv = &tv;
}
return (event_add(ev, ptv));
}
/*
* This callback is executed when the size of the input buffer changes.
* We use it to apply back pressure on the reading side.
*/
void
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg) {
struct bufferevent *bufev = arg;
/*
* If we are below the watermark then reschedule reading if it's
* still enabled.
*/
if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
evbuffer_setcb(buf, NULL, NULL);
if (bufev->enabled & EV_READ)
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
}
}
static void
bufferevent_readcb(int fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_READ;
size_t len;
int howmuch = -1;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0)
howmuch = (int)bufev->wm_read.high;
res = evbuffer_read(bufev->input, fd, howmuch);
if (res == -1) {
if (errno == EAGAIN || errno == EINTR)
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
/* See if this callbacks meets the water marks */
len = EVBUFFER_LENGTH(bufev->input);
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
struct evbuffer *buf = bufev->input;
event_del(&bufev->ev_read);
/* Now schedule a callback for us */
evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
return;
}
/* Invoke the user callback - must always be called last */
if (bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
return;
reschedule:
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
return;
error:
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
static void
bufferevent_writecb(int fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_WRITE;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
if (EVBUFFER_LENGTH(bufev->output)) {
res = evbuffer_write(bufev->output, fd);
if (res == -1) {
#ifndef WIN32
/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
*set errno. thus this error checking is not portable*/
if (errno == EAGAIN ||
errno == EINTR ||
errno == EINPROGRESS)
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
#else
goto reschedule;
#endif
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
}
if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
/*
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if (bufev->writecb != NULL &&
EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
return;
reschedule:
if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
return;
error:
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
/*
* Create a new buffered event object.
*
* The read callback is invoked whenever we read new data.
* The write callback is invoked whenever the output buffer is drained.
* The error callback is invoked on a write/read error or on EOF.
*
* Both read and write callbacks maybe NULL. The error callback is not
* allowed to be NULL and have to be provided always.
*/
struct bufferevent *
bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
everrorcb errorcb, void *cbarg)
{
struct bufferevent *bufev;
if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
return (NULL);
if ((bufev->input = evbuffer_new()) == NULL) {
free(bufev);
return (NULL);
}
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
free(bufev);
return (NULL);
}
event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = errorcb;
bufev->cbarg = cbarg;
/*
* Set to EV_WRITE so that using bufferevent_write is going to
* trigger a callback. Reading needs to be explicitly enabled
* because otherwise no data will be available.
*/
bufev->enabled = EV_WRITE;
return (bufev);
}
int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
if (event_priority_set(&bufev->ev_read, priority) == -1)
return (-1);
if (event_priority_set(&bufev->ev_write, priority) == -1)
return (-1);
return (0);
}
/* Closing the file descriptor is the responsibility of the caller */
void
bufferevent_free(struct bufferevent *bufev)
{
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
free(bufev);
}
/*
* Returns 0 on success;
* -1 on failure.
*/
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
int res;
res = evbuffer_add(bufev->output, data, size);
if (res == -1)
return (res);
/* If everything is okay, we need to schedule a write */
if (size > 0 && (bufev->enabled & EV_WRITE))
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
return (res);
}
int
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{
int res;
res = bufferevent_write(bufev, buf->buffer, buf->off);
if (res != -1)
evbuffer_drain(buf, buf->off);
return (res);
}
size_t
bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
{
struct evbuffer *buf = bufev->input;
if (buf->off < size)
size = buf->off;
/* Copy the available data to the user buffer */
memcpy(data, buf->buffer, size);
if (size)
evbuffer_drain(buf, size);
return (size);
}
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
return (-1);
}
if (event & EV_WRITE) {
if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
return (-1);
}
bufev->enabled |= event;
return (0);
}
int
bufferevent_disable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (event_del(&bufev->ev_read) == -1)
return (-1);
}
if (event & EV_WRITE) {
if (event_del(&bufev->ev_write) == -1)
return (-1);
}
bufev->enabled &= ~event;
return (0);
}
/*
* Sets the read and write timeout for a buffered event.
*/
void
bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write) {
bufev->timeout_read = timeout_read;
bufev->timeout_write = timeout_write;
}
/*
* Sets the water marks
*/
void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
}
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
bufev->wm_write.high = highmark;
}
/* If the watermarks changed then see if we should call read again */
bufferevent_read_pressure_cb(bufev->input,
0, EVBUFFER_LENGTH(bufev->input), bufev);
}
int
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
{
int res;
res = event_base_set(base, &bufev->ev_read);
if (res == -1)
return (res);
res = event_base_set(base, &bufev->ev_write);
return (res);
}