1
0
mirror of https://github.com/lammertb/libhttp.git synced 2026-01-27 08:02:47 +03:00

Moved read_websocket to own file

This commit is contained in:
Lammert Bies
2016-12-10 20:45:46 +01:00
parent 60339f151b
commit 8910429c47
4 changed files with 201 additions and 160 deletions

View File

@@ -74,6 +74,7 @@ LIB_SOURCES = src/libhttp.c \
src/httplib_parse_net.c \
src/httplib_process_new_connection.c \
src/httplib_produce_socket.c \
src/httplib_read_websocket.c \
src/httplib_redirect_to_https_port.c \
src/httplib_refresh_trust.c \
src/httplib_reset_per_request_attributes.c \

View File

@@ -0,0 +1,193 @@
/*
* Copyright (c) 2016 Lammert Bies
* Copyright (c) 2013-2016 the Civetweb developers
* Copyright (c) 2004-2013 Sergey Lyubka
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "libhttp-private.h"
/*
* void XX_httplib_read_websocket( struct mg_connection *conn, mg_websocket_data_handler ws_data_handler, void *calback_data );
*
* The function XX_httplib_read_websocket() reads from a websocket connection.
*/
#if defined(USE_WEBSOCKET)
void XX_httplib_read_websocket( struct mg_connection *conn, mg_websocket_data_handler ws_data_handler, void *callback_data ) {
/* Pointer to the beginning of the portion of the incoming websocket
* message queue.
* The original websocket upgrade request is never removed, so the queue
* begins after it. */
unsigned char *buf = (unsigned char *)conn->buf + conn->request_len;
int n;
int error;
int exit_by_callback;
/* body_len is the length of the entire queue in bytes
* len is the length of the current message
* data_len is the length of the current message's data payload
* header_len is the length of the current message's header */
size_t i;
size_t len;
size_t mask_len = 0;
size_t data_len = 0;
size_t header_len;
size_t body_len;
/* "The masking key is a 32-bit value chosen at random by the client."
* http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17#section-5
*/
unsigned char mask[4];
/* data points to the place where the message is stored when passed to
* the
* websocket_data callback. This is either mem on the stack, or a
* dynamically allocated buffer if it is too large. */
char mem[4096];
char *data = mem;
unsigned char mop; /* mask flag and opcode */
double timeout;
timeout = -1.0;
if ( conn->ctx->config[WEBSOCKET_TIMEOUT] ) timeout = atoi(conn->ctx->config[WEBSOCKET_TIMEOUT]) / 1000.0;
if ( timeout <= 0.0 && (conn->ctx->config[REQUEST_TIMEOUT]) ) timeout = atoi(conn->ctx->config[REQUEST_TIMEOUT]) / 1000.0;
XX_httplib_set_thread_name("wsock");
/* Loop continuously, reading messages from the socket, invoking the
* callback, and waiting repeatedly until an error occurs. */
while (!conn->ctx->stop_flag) {
header_len = 0;
assert(conn->data_len >= conn->request_len);
if ((body_len = (size_t)(conn->data_len - conn->request_len)) >= 2) {
len = buf[1] & 127;
mask_len = (buf[1] & 128) ? 4 : 0;
if ((len < 126) && (body_len >= mask_len)) {
data_len = len;
header_len = 2 + mask_len;
} else if ((len == 126) && (body_len >= (4 + mask_len))) {
header_len = 4 + mask_len;
data_len = ((((size_t)buf[2]) << 8) + buf[3]);
} else if (body_len >= (10 + mask_len)) {
header_len = 10 + mask_len;
data_len = (((uint64_t)ntohl(*(uint32_t *)(void *)&buf[2]))
<< 32) + ntohl(*(uint32_t *)(void *)&buf[6]);
}
}
if (header_len > 0 && body_len >= header_len) {
/* Allocate space to hold websocket payload */
data = mem;
if (data_len > sizeof(mem)) {
data = (char *)XX_httplib_malloc(data_len);
if (data == NULL) {
/* Allocation failed, exit the loop and then close the
* connection */
mg_cry(conn, "websocket out of memory; closing connection");
break;
}
}
/* Copy the mask before we shift the queue and destroy it */
if (mask_len > 0) memcpy( mask, buf + header_len - mask_len, sizeof(mask) );
else memset( mask, 0, sizeof(mask) );
/* Read frame payload from the first message in the queue into
* data and advance the queue by moving the memory in place. */
assert(body_len >= header_len);
if (data_len + header_len > body_len) {
mop = buf[0]; /* current mask and opcode */
/* Overflow case */
len = body_len - header_len;
memcpy(data, buf + header_len, len);
error = 0;
while (len < data_len) {
n = XX_httplib_pull( NULL, conn, data + len, (int)(data_len - len), timeout);
if (n <= 0) {
error = 1;
break;
}
len += (size_t)n;
}
if (error) {
mg_cry(conn, "Websocket pull failed; closing connection");
break;
}
conn->data_len = conn->request_len;
} else {
mop = buf[0]; /* current mask and opcode, overwritten by
* memmove() */
/* Length of the message being read at the front of the
* queue */
len = data_len + header_len;
/* Copy the data payload into the data pointer for the
* callback */
memcpy(data, buf + header_len, data_len);
/* Move the queue forward len bytes */
memmove(buf, buf + len, body_len - len);
/* Mark the queue as advanced */
conn->data_len -= (int)len;
}
/* Apply mask if necessary */
if (mask_len > 0) for (i = 0; i < data_len; ++i) data[i] ^= mask[i & 3];
/* Exit the loop if callback signals to exit (server side),
* or "connection close" opcode received (client side). */
exit_by_callback = 0;
if ((ws_data_handler != NULL) && !ws_data_handler(conn, mop, data, data_len, callback_data)) {
exit_by_callback = 1;
}
if (data != mem) XX_httplib_free(data);
if (exit_by_callback || ((mop & 0xf) == WEBSOCKET_OPCODE_CONNECTION_CLOSE)) {
/* Opcode == 8, connection close */
break;
}
/* Not breaking the loop, process next websocket frame. */
} else {
/* Read from the socket into the next available location in the
* message queue. */
if ((n = XX_httplib_pull(NULL, conn, conn->buf + conn->data_len, conn->buf_size - conn->data_len, timeout)) <= 0) {
/* Error, no bytes read */
break;
}
conn->data_len += n;
}
}
XX_httplib_set_thread_name("worker");
} /* XX_httplib_read_websocket */
#endif /* !USE_WEBSOCKET */

View File

@@ -887,6 +887,7 @@ int XX_httplib_parse_http_message( char *buf, int len, struct mg_request_info
int XX_httplib_parse_net( const char *spec, uint32_t *net, uint32_t *mask );
void XX_httplib_process_new_connection( struct mg_connection *conn );
void XX_httplib_produce_socket( struct mg_context *ctx, const struct socket *sp );
int XX_httplib_pull( FILE *fp, struct mg_connection *conn, char *buf, int len, double timeout );
void XX_httplib_put_file( struct mg_connection *conn, const char *path );
int XX_httplib_read_request( FILE *fp, struct mg_connection *conn, char *buf, int bufsiz, int *nread );
void XX_httplib_read_websocket( struct mg_connection *conn, mg_websocket_data_handler ws_data_handler, void *callback_data );

View File

@@ -2712,7 +2712,7 @@ static int64_t push_all(struct mg_context *ctx, FILE *fp, SOCKET sock, SSL *ssl,
/* Read from IO channel - opened file descriptor, socket, or SSL descriptor.
* Return negative value on error, or number of bytes read on success. */
static int pull(FILE *fp, struct mg_connection *conn, char *buf, int len, double timeout) {
int XX_httplib_pull( FILE *fp, struct mg_connection *conn, char *buf, int len, double timeout ) {
int nread;
int err;
@@ -2808,7 +2808,8 @@ static int pull(FILE *fp, struct mg_connection *conn, char *buf, int len, double
/* Timeout occured, but no data available. */
return -1;
}
} /* XX_httplib_pull */
static int pull_all(FILE *fp, struct mg_connection *conn, char *buf, int len) {
@@ -2822,7 +2823,7 @@ static int pull_all(FILE *fp, struct mg_connection *conn, char *buf, int len) {
}
while (len > 0 && conn->ctx->stop_flag == 0) {
n = pull(fp, conn, buf + nread, len, timeout);
n = XX_httplib_pull(fp, conn, buf + nread, len, timeout);
if (n < 0) {
if (nread == 0) {
nread = n; /* Propagate the error */
@@ -5358,7 +5359,7 @@ int XX_httplib_read_request( FILE *fp, struct mg_connection *conn, char *buf, in
(conn->ctx->stop_flag == 0) && (*nread < bufsiz) && (request_len == 0)
&& ((mg_difftimespec(&last_action_time, &(conn->req_time))
<= request_timeout) || (request_timeout < 0))
&& ((n = pull(fp, conn, buf + *nread, bufsiz - *nread, request_timeout))
&& ((n = XX_httplib_pull(fp, conn, buf + *nread, bufsiz - *nread, request_timeout))
> 0)) {
*nread += n;
/* assert(*nread <= bufsiz); */
@@ -5500,7 +5501,7 @@ static int forward_body_data(struct mg_connection *conn, FILE *fp, SOCKET sock,
if ((int64_t)to_read > conn->content_len - conn->consumed_content) {
to_read = (int)(conn->content_len - conn->consumed_content);
}
nread = pull(NULL, conn, buf, to_read, timeout);
nread = XX_httplib_pull(NULL, conn, buf, to_read, timeout);
if (nread <= 0 || push_all(conn->ctx, fp, sock, ssl, buf, nread) != nread) break;
conn->consumed_content += nread;
}
@@ -6803,159 +6804,4 @@ int XX_httplib_send_websocket_handshake( struct mg_connection *conn, const char
} /* XX_httplib_send_websocket_handshake */
void XX_httplib_read_websocket( struct mg_connection *conn, mg_websocket_data_handler ws_data_handler, void *callback_data ) {
/* Pointer to the beginning of the portion of the incoming websocket
* message queue.
* The original websocket upgrade request is never removed, so the queue
* begins after it. */
unsigned char *buf = (unsigned char *)conn->buf + conn->request_len;
int n;
int error;
int exit_by_callback;
/* body_len is the length of the entire queue in bytes
* len is the length of the current message
* data_len is the length of the current message's data payload
* header_len is the length of the current message's header */
size_t i;
size_t len;
size_t mask_len = 0;
size_t data_len = 0;
size_t header_len;
size_t body_len;
/* "The masking key is a 32-bit value chosen at random by the client."
* http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17#section-5
*/
unsigned char mask[4];
/* data points to the place where the message is stored when passed to
* the
* websocket_data callback. This is either mem on the stack, or a
* dynamically allocated buffer if it is too large. */
char mem[4096];
char *data = mem;
unsigned char mop; /* mask flag and opcode */
double timeout;
timeout = -1.0;
if ( conn->ctx->config[WEBSOCKET_TIMEOUT] ) timeout = atoi(conn->ctx->config[WEBSOCKET_TIMEOUT]) / 1000.0;
if ( timeout <= 0.0 && (conn->ctx->config[REQUEST_TIMEOUT]) ) timeout = atoi(conn->ctx->config[REQUEST_TIMEOUT]) / 1000.0;
XX_httplib_set_thread_name("wsock");
/* Loop continuously, reading messages from the socket, invoking the
* callback, and waiting repeatedly until an error occurs. */
while (!conn->ctx->stop_flag) {
header_len = 0;
assert(conn->data_len >= conn->request_len);
if ((body_len = (size_t)(conn->data_len - conn->request_len)) >= 2) {
len = buf[1] & 127;
mask_len = (buf[1] & 128) ? 4 : 0;
if ((len < 126) && (body_len >= mask_len)) {
data_len = len;
header_len = 2 + mask_len;
} else if ((len == 126) && (body_len >= (4 + mask_len))) {
header_len = 4 + mask_len;
data_len = ((((size_t)buf[2]) << 8) + buf[3]);
} else if (body_len >= (10 + mask_len)) {
header_len = 10 + mask_len;
data_len = (((uint64_t)ntohl(*(uint32_t *)(void *)&buf[2]))
<< 32) + ntohl(*(uint32_t *)(void *)&buf[6]);
}
}
if (header_len > 0 && body_len >= header_len) {
/* Allocate space to hold websocket payload */
data = mem;
if (data_len > sizeof(mem)) {
data = (char *)XX_httplib_malloc(data_len);
if (data == NULL) {
/* Allocation failed, exit the loop and then close the
* connection */
mg_cry(conn, "websocket out of memory; closing connection");
break;
}
}
/* Copy the mask before we shift the queue and destroy it */
if (mask_len > 0) memcpy( mask, buf + header_len - mask_len, sizeof(mask) );
else memset( mask, 0, sizeof(mask) );
/* Read frame payload from the first message in the queue into
* data and advance the queue by moving the memory in place. */
assert(body_len >= header_len);
if (data_len + header_len > body_len) {
mop = buf[0]; /* current mask and opcode */
/* Overflow case */
len = body_len - header_len;
memcpy(data, buf + header_len, len);
error = 0;
while (len < data_len) {
n = pull( NULL, conn, data + len, (int)(data_len - len), timeout);
if (n <= 0) {
error = 1;
break;
}
len += (size_t)n;
}
if (error) {
mg_cry(conn, "Websocket pull failed; closing connection");
break;
}
conn->data_len = conn->request_len;
} else {
mop = buf[0]; /* current mask and opcode, overwritten by
* memmove() */
/* Length of the message being read at the front of the
* queue */
len = data_len + header_len;
/* Copy the data payload into the data pointer for the
* callback */
memcpy(data, buf + header_len, data_len);
/* Move the queue forward len bytes */
memmove(buf, buf + len, body_len - len);
/* Mark the queue as advanced */
conn->data_len -= (int)len;
}
/* Apply mask if necessary */
if (mask_len > 0) for (i = 0; i < data_len; ++i) data[i] ^= mask[i & 3];
/* Exit the loop if callback signals to exit (server side),
* or "connection close" opcode received (client side). */
exit_by_callback = 0;
if ((ws_data_handler != NULL) && !ws_data_handler(conn, mop, data, data_len, callback_data)) {
exit_by_callback = 1;
}
if (data != mem) XX_httplib_free(data);
if (exit_by_callback || ((mop & 0xf) == WEBSOCKET_OPCODE_CONNECTION_CLOSE)) {
/* Opcode == 8, connection close */
break;
}
/* Not breaking the loop, process next websocket frame. */
} else {
/* Read from the socket into the next available location in the
* message queue. */
if ((n = pull(NULL, conn, conn->buf + conn->data_len, conn->buf_size - conn->data_len, timeout)) <= 0) {
/* Error, no bytes read */
break;
}
conn->data_len += n;
}
}
XX_httplib_set_thread_name("worker");
} /* XX_httplib_read_websocket */
#endif /* !USE_WEBSOCKET */