1
0
mirror of https://github.com/esp8266/Arduino.git synced 2025-06-03 07:02:28 +03:00

WifiClient::write refactoring (second attempt) (#2177)

* WiFiClient: use DataSource for writes

* ESP8266WebServer: delegate writing to WiFiClient

* ESP8266WebServer: set write timeout before sending content
This commit is contained in:
Ivan Grokhotkov 2016-06-23 17:47:18 +08:00 committed by GitHub
parent 5e3df08273
commit 8db4dcea42
7 changed files with 565 additions and 415 deletions

View File

@ -193,7 +193,7 @@ void ESP8266WebServer::handleClient() {
_currentStatus = HC_NONE;
return;
}
_currentClient.setTimeout(HTTP_MAX_SEND_WAIT);
_contentLength = CONTENT_LENGTH_NOT_SET;
_handleRequest();
@ -241,6 +241,9 @@ void ESP8266WebServer::sendHeader(const String& name, const String& value, bool
}
}
void ESP8266WebServer::setContentLength(size_t contentLength) {
_contentLength = contentLength;
}
void ESP8266WebServer::_prepareHeader(String& response, int code, const char* content_type, size_t contentLength) {
response = "HTTP/1.1 ";
@ -270,7 +273,6 @@ void ESP8266WebServer::send(int code, const char* content_type, const String& co
String header;
_prepareHeader(header, code, content_type, content.length());
sendContent(header);
sendContent(content);
}
@ -307,67 +309,15 @@ void ESP8266WebServer::send(int code, const String& content_type, const String&
}
void ESP8266WebServer::sendContent(const String& content) {
const size_t unit_size = HTTP_DOWNLOAD_UNIT_SIZE;
size_t size_to_send = content.length();
const char* send_start = content.c_str();
while (size_to_send) {
size_t will_send = (size_to_send < unit_size) ? size_to_send : unit_size;
size_t sent = _currentClient.write(send_start, will_send);
if (sent == 0) {
break;
}
size_to_send -= sent;
send_start += sent;
}
_currentClient.write(content.c_str(), content.length());
}
void ESP8266WebServer::sendContent_P(PGM_P content) {
char contentUnit[HTTP_DOWNLOAD_UNIT_SIZE + 1];
contentUnit[HTTP_DOWNLOAD_UNIT_SIZE] = '\0';
while (content != NULL) {
size_t contentUnitLen;
PGM_P contentNext;
// due to the memccpy signature, lots of casts are needed
contentNext = (PGM_P)memccpy_P((void*)contentUnit, (PGM_VOID_P)content, 0, HTTP_DOWNLOAD_UNIT_SIZE);
if (contentNext == NULL) {
// no terminator, more data available
content += HTTP_DOWNLOAD_UNIT_SIZE;
contentUnitLen = HTTP_DOWNLOAD_UNIT_SIZE;
}
else {
// reached terminator. Do not send the terminator
contentUnitLen = contentNext - contentUnit - 1;
content = NULL;
}
// write is so overloaded, had to use the cast to get it pick the right one
_currentClient.write((const char*)contentUnit, contentUnitLen);
}
_currentClient.write_P(content, strlen_P(content));
}
void ESP8266WebServer::sendContent_P(PGM_P content, size_t size) {
char contentUnit[HTTP_DOWNLOAD_UNIT_SIZE + 1];
contentUnit[HTTP_DOWNLOAD_UNIT_SIZE] = '\0';
size_t remaining_size = size;
while (content != NULL && remaining_size > 0) {
size_t contentUnitLen = HTTP_DOWNLOAD_UNIT_SIZE;
if (remaining_size < HTTP_DOWNLOAD_UNIT_SIZE) contentUnitLen = remaining_size;
// due to the memcpy signature, lots of casts are needed
memcpy_P((void*)contentUnit, (PGM_VOID_P)content, contentUnitLen);
content += contentUnitLen;
remaining_size -= contentUnitLen;
// write is so overloaded, had to use the cast to get it pick the right one
_currentClient.write((const char*)contentUnit, contentUnitLen);
}
_currentClient.write_P(content, size);
}

View File

@ -36,6 +36,7 @@ enum HTTPClientStatus { HC_NONE, HC_WAIT_READ, HC_WAIT_CLOSE };
#define HTTP_UPLOAD_BUFLEN 2048
#define HTTP_MAX_DATA_WAIT 1000 //ms to wait for the client to send the request
#define HTTP_MAX_POST_WAIT 1000 //ms to wait for POST data to arrive
#define HTTP_MAX_SEND_WAIT 5000 //ms to wait for data chunk to be ACKed
#define HTTP_MAX_CLOSE_WAIT 2000 //ms to wait for the client to close the connection
#define CONTENT_LENGTH_UNKNOWN ((size_t) -1)
@ -113,7 +114,7 @@ public:
void send_P(int code, PGM_P content_type, PGM_P content);
void send_P(int code, PGM_P content_type, PGM_P content, size_t contentLength);
void setContentLength(size_t contentLength) { _contentLength = contentLength; }
void setContentLength(size_t contentLength);
void sendHeader(const String& name, const String& value, bool first = false);
void sendContent(const String& content);
void sendContent_P(PGM_P content);
@ -129,7 +130,7 @@ template<typename T> size_t streamFile(T &file, const String& contentType){
sendHeader("Content-Encoding", "gzip");
}
send(200, contentType, "");
return _currentClient.write(file, HTTP_DOWNLOAD_UNIT_SIZE);
return _currentClient.write(file);
}
protected:

View File

@ -172,8 +172,21 @@ size_t WiFiClient::write(const uint8_t *buf, size_t size)
{
return 0;
}
return _client->write(buf, size);
}
return _client->write(reinterpret_cast<const char*>(buf), size);
size_t WiFiClient::write(Stream& stream, size_t unused)
{
return WiFiClient::write(stream);
}
size_t WiFiClient::write(Stream& stream)
{
if (!_client || !stream.available())
{
return 0;
}
return _client->write(stream);
}
size_t WiFiClient::write_P(PGM_P buf, size_t size)
@ -182,25 +195,7 @@ size_t WiFiClient::write_P(PGM_P buf, size_t size)
{
return 0;
}
char chunkUnit[WIFICLIENT_MAX_PACKET_SIZE + 1];
chunkUnit[WIFICLIENT_MAX_PACKET_SIZE] = '\0';
size_t remaining_size = size;
while (buf != NULL && remaining_size > 0) {
size_t chunkUnitLen = WIFICLIENT_MAX_PACKET_SIZE;
if (remaining_size < WIFICLIENT_MAX_PACKET_SIZE) chunkUnitLen = remaining_size;
// due to the memcpy signature, lots of casts are needed
memcpy_P((void*)chunkUnit, (PGM_VOID_P)buf, chunkUnitLen);
buf += chunkUnitLen;
remaining_size -= chunkUnitLen;
// write is so overloaded, had to use the cast to get it pick the right one
_client->write((const char*)chunkUnit, chunkUnitLen);
}
return size;
return _client->write_P(buf, size);
}
int WiFiClient::available()

View File

@ -49,8 +49,10 @@ public:
virtual size_t write(uint8_t);
virtual size_t write(const uint8_t *buf, size_t size);
size_t write_P(PGM_P buf, size_t size);
template <typename T>
size_t write(T& source, size_t unitSize);
size_t write(Stream& stream);
// This one is deprecated, use write(Stream& instead)
size_t write(Stream& stream, size_t unitSize) __attribute__ ((deprecated));
virtual int available();
virtual int read();
@ -73,28 +75,6 @@ public:
void setNoDelay(bool nodelay);
static void setLocalPortStart(uint16_t port) { _localPort = port; }
template<typename T> size_t write(T &src){
uint8_t obuf[WIFICLIENT_MAX_PACKET_SIZE];
size_t doneLen = 0;
size_t sentLen;
int i;
while (src.available() > WIFICLIENT_MAX_PACKET_SIZE){
src.read(obuf, WIFICLIENT_MAX_PACKET_SIZE);
sentLen = write(obuf, WIFICLIENT_MAX_PACKET_SIZE);
doneLen = doneLen + sentLen;
if(sentLen != WIFICLIENT_MAX_PACKET_SIZE){
return doneLen;
}
}
uint16_t leftLen = src.available();
src.read(obuf, leftLen);
sentLen = write(obuf, leftLen);
doneLen = doneLen + sentLen;
return doneLen;
}
friend class WiFiServer;
using Print::write;
@ -114,24 +94,4 @@ protected:
static uint16_t _localPort;
};
template <typename T>
inline size_t WiFiClient::write(T& source, size_t unitSize) {
std::unique_ptr<uint8_t[]> buffer(new uint8_t[unitSize]);
size_t size_sent = 0;
while(true) {
size_t left = source.available();
if (!left)
break;
size_t will_send = (left < unitSize) ? left : unitSize;
source.read(buffer.get(), will_send);
size_t cb = write(buffer.get(), will_send);
size_sent += cb;
if (cb != will_send) {
break;
}
}
return size_sent;
}
#endif

View File

@ -557,7 +557,8 @@ extern "C" int ax_port_write(int fd, uint8_t* buffer, size_t count) {
errno = EIO;
return -1;
}
size_t cb = _client->write((const char*) buffer, count);
size_t cb = _client->write(buffer, count);
if (cb != count) {
errno = EAGAIN;
}

View File

@ -35,317 +35,447 @@ typedef err_t recv_ret_t;
typedef int32_t recv_ret_t;
#endif
class ClientContext {
public:
ClientContext(tcp_pcb* pcb, discard_cb_t discard_cb, void* discard_cb_arg) :
_pcb(pcb), _rx_buf(0), _rx_buf_offset(0), _discard_cb(discard_cb), _discard_cb_arg(discard_cb_arg), _refcnt(0), _next(0), _send_waiting(false) {
tcp_setprio(pcb, TCP_PRIO_MIN);
tcp_arg(pcb, this);
tcp_recv(pcb, (tcp_recv_fn) &_s_recv);
tcp_sent(pcb, &_s_sent);
tcp_err(pcb, &_s_error);
}
#include "DataSource.h"
err_t abort(){
if(_pcb) {
DEBUGV(":abort\r\n");
tcp_arg(_pcb, NULL);
tcp_sent(_pcb, NULL);
tcp_recv(_pcb, NULL);
tcp_err(_pcb, NULL);
tcp_abort(_pcb);
_pcb = 0;
}
return ERR_ABRT;
}
class ClientContext
{
public:
ClientContext(tcp_pcb* pcb, discard_cb_t discard_cb, void* discard_cb_arg) :
_pcb(pcb), _rx_buf(0), _rx_buf_offset(0), _discard_cb(discard_cb), _discard_cb_arg(discard_cb_arg), _refcnt(0), _next(0)
{
tcp_setprio(pcb, TCP_PRIO_MIN);
tcp_arg(pcb, this);
tcp_recv(pcb, (tcp_recv_fn) &_s_recv);
tcp_sent(pcb, &_s_sent);
tcp_err(pcb, &_s_error);
tcp_poll(pcb, &_s_poll, 1);
}
err_t close(){
err_t err = ERR_OK;
if(_pcb) {
DEBUGV(":close\r\n");
tcp_arg(_pcb, NULL);
tcp_sent(_pcb, NULL);
tcp_recv(_pcb, NULL);
tcp_err(_pcb, NULL);
err = tcp_close(_pcb);
if(err != ERR_OK) {
DEBUGV(":tc err %d\r\n", err);
tcp_abort(_pcb);
err = ERR_ABRT;
}
_pcb = 0;
}
return err;
}
~ClientContext() {
}
ClientContext* next() const {
return _next;
}
ClientContext* next(ClientContext* new_next) {
_next = new_next;
return _next;
}
void ref() {
++_refcnt;
DEBUGV(":ref %d\r\n", _refcnt);
}
void unref() {
if(this != 0) {
DEBUGV(":ur %d\r\n", _refcnt);
if(--_refcnt == 0) {
flush();
close();
if(_discard_cb)
_discard_cb(_discard_cb_arg, this);
DEBUGV(":del\r\n");
delete this;
}
}
}
void setNoDelay(bool nodelay){
if(!_pcb) return;
if(nodelay) tcp_nagle_disable(_pcb);
else tcp_nagle_enable(_pcb);
}
bool getNoDelay(){
if(!_pcb) return false;
return tcp_nagle_disabled(_pcb);
}
uint32_t getRemoteAddress() {
if(!_pcb) return 0;
return _pcb->remote_ip.addr;
}
uint16_t getRemotePort() {
if(!_pcb) return 0;
return _pcb->remote_port;
}
uint32_t getLocalAddress() {
if(!_pcb) return 0;
return _pcb->local_ip.addr;
}
uint16_t getLocalPort() {
if(!_pcb) return 0;
return _pcb->local_port;
}
size_t getSize() const {
if(!_rx_buf) return 0;
return _rx_buf->tot_len - _rx_buf_offset;
}
char read() {
if(!_rx_buf) return 0;
char c = reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset];
_consume(1);
return c;
}
size_t read(char* dst, size_t size) {
if(!_rx_buf) return 0;
size_t max_size = _rx_buf->tot_len - _rx_buf_offset;
size = (size < max_size) ? size : max_size;
DEBUGV(":rd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset);
size_t size_read = 0;
while(size) {
size_t buf_size = _rx_buf->len - _rx_buf_offset;
size_t copy_size = (size < buf_size) ? size : buf_size;
DEBUGV(":rdi %d, %d\r\n", buf_size, copy_size);
os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size);
dst += copy_size;
_consume(copy_size);
size -= copy_size;
size_read += copy_size;
}
return size_read;
}
char peek() {
if(!_rx_buf) return 0;
return reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset];
}
size_t peekBytes(char *dst, size_t size) {
if(!_rx_buf) return 0;
size_t max_size = _rx_buf->tot_len - _rx_buf_offset;
size = (size < max_size) ? size : max_size;
DEBUGV(":pd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset);
size_t buf_size = _rx_buf->len - _rx_buf_offset;
size_t copy_size = (size < buf_size) ? size : buf_size;
DEBUGV(":rpi %d, %d\r\n", buf_size, copy_size);
os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size);
return copy_size;
}
void flush() {
if(!_rx_buf) {
return;
}
if(_pcb) {
tcp_recved(_pcb, (size_t) _rx_buf->tot_len);
}
pbuf_free(_rx_buf);
_rx_buf = 0;
_rx_buf_offset = 0;
}
uint8_t state() const {
if(!_pcb) return CLOSED;
return _pcb->state;
}
size_t write(const char* data, size_t size) {
if(!_pcb) {
DEBUGV(":wr !_pcb\r\n");
return 0;
}
if(size == 0) {
return 0;
}
size_t room = tcp_sndbuf(_pcb);
size_t will_send = (room < size) ? room : size;
err_t err = tcp_write(_pcb, data, will_send, 0);
if(err != ERR_OK) {
DEBUGV(":wr !ERR_OK\r\n");
return 0;
}
_size_sent = will_send;
DEBUGV(":wr\r\n");
tcp_output( _pcb );
_send_waiting = true;
delay(5000); // max send timeout
_send_waiting = false;
DEBUGV(":ww\r\n");
return will_send - _size_sent;
}
private:
err_t _sent(tcp_pcb* pcb, uint16_t len) {
DEBUGV(":sent %d\r\n", len);
_size_sent -= len;
if(_size_sent == 0 && _send_waiting) esp_schedule();
return ERR_OK;
}
void _consume(size_t size) {
ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size;
if(left > 0) {
_rx_buf_offset += size;
} else if(!_rx_buf->next) {
DEBUGV(":c0 %d, %d\r\n", size, _rx_buf->tot_len);
if(_pcb) tcp_recved(_pcb, _rx_buf->len);
pbuf_free(_rx_buf);
_rx_buf = 0;
_rx_buf_offset = 0;
} else {
DEBUGV(":c %d, %d, %d\r\n", size, _rx_buf->len, _rx_buf->tot_len);
auto head = _rx_buf;
_rx_buf = _rx_buf->next;
_rx_buf_offset = 0;
pbuf_ref(_rx_buf);
if(_pcb) tcp_recved(_pcb, head->len);
pbuf_free(head);
}
}
recv_ret_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err) {
if(pb == 0) // connection closed
{
DEBUGV(":rcl\r\n");
if (_send_waiting) {
esp_schedule();
}
abort();
return ERR_ABRT;
}
if(_rx_buf) {
// there is some unread data
// chain the new pbuf to the existing one
DEBUGV(":rch %d, %d\r\n", _rx_buf->tot_len, pb->tot_len);
pbuf_cat(_rx_buf, pb);
} else {
DEBUGV(":rn %d\r\n", pb->tot_len);
_rx_buf = pb;
_rx_buf_offset = 0;
}
return ERR_OK;
}
void _error(err_t err) {
DEBUGV(":er %d %d %d\r\n", err, _size_sent, _send_waiting);
err_t abort()
{
if(_pcb) {
DEBUGV(":abort\r\n");
tcp_arg(_pcb, NULL);
tcp_sent(_pcb, NULL);
tcp_recv(_pcb, NULL);
tcp_err(_pcb, NULL);
_pcb = NULL;
if(_size_sent && _send_waiting) {
esp_schedule();
tcp_poll(_pcb, NULL, 0);
tcp_abort(_pcb);
_pcb = 0;
}
return ERR_ABRT;
}
err_t close()
{
err_t err = ERR_OK;
if(_pcb) {
DEBUGV(":close\r\n");
tcp_arg(_pcb, NULL);
tcp_sent(_pcb, NULL);
tcp_recv(_pcb, NULL);
tcp_err(_pcb, NULL);
tcp_poll(_pcb, NULL, 0);
err = tcp_close(_pcb);
if(err != ERR_OK) {
DEBUGV(":tc err %d\r\n", err);
tcp_abort(_pcb);
err = ERR_ABRT;
}
_pcb = 0;
}
return err;
}
~ClientContext()
{
}
ClientContext* next() const
{
return _next;
}
ClientContext* next(ClientContext* new_next)
{
_next = new_next;
return _next;
}
void ref()
{
++_refcnt;
DEBUGV(":ref %d\r\n", _refcnt);
}
void unref()
{
if(this != 0) {
DEBUGV(":ur %d\r\n", _refcnt);
if(--_refcnt == 0) {
flush();
close();
if(_discard_cb) {
_discard_cb(_discard_cb_arg, this);
}
DEBUGV(":del\r\n");
delete this;
}
}
}
void setNoDelay(bool nodelay)
{
if(!_pcb) {
return;
}
if(nodelay) {
tcp_nagle_disable(_pcb);
} else {
tcp_nagle_enable(_pcb);
}
}
bool getNoDelay()
{
if(!_pcb) {
return false;
}
return tcp_nagle_disabled(_pcb);
}
void setNonBlocking(bool nonblocking)
{
_noblock = nonblocking;
}
bool getNonBlocking()
{
return _noblock;
}
uint32_t getRemoteAddress()
{
if(!_pcb) {
return 0;
}
return _pcb->remote_ip.addr;
}
uint16_t getRemotePort()
{
if(!_pcb) {
return 0;
}
return _pcb->remote_port;
}
uint32_t getLocalAddress()
{
if(!_pcb) {
return 0;
}
return _pcb->local_ip.addr;
}
uint16_t getLocalPort()
{
if(!_pcb) {
return 0;
}
return _pcb->local_port;
}
size_t getSize() const
{
if(!_rx_buf) {
return 0;
}
return _rx_buf->tot_len - _rx_buf_offset;
}
char read()
{
if(!_rx_buf) {
return 0;
}
char c = reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset];
_consume(1);
return c;
}
size_t read(char* dst, size_t size)
{
if(!_rx_buf) {
return 0;
}
size_t max_size = _rx_buf->tot_len - _rx_buf_offset;
size = (size < max_size) ? size : max_size;
DEBUGV(":rd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset);
size_t size_read = 0;
while(size) {
size_t buf_size = _rx_buf->len - _rx_buf_offset;
size_t copy_size = (size < buf_size) ? size : buf_size;
DEBUGV(":rdi %d, %d\r\n", buf_size, copy_size);
os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size);
dst += copy_size;
_consume(copy_size);
size -= copy_size;
size_read += copy_size;
}
return size_read;
}
char peek()
{
if(!_rx_buf) {
return 0;
}
return reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset];
}
size_t peekBytes(char *dst, size_t size)
{
if(!_rx_buf) {
return 0;
}
size_t max_size = _rx_buf->tot_len - _rx_buf_offset;
size = (size < max_size) ? size : max_size;
DEBUGV(":pd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset);
size_t buf_size = _rx_buf->len - _rx_buf_offset;
size_t copy_size = (size < buf_size) ? size : buf_size;
DEBUGV(":rpi %d, %d\r\n", buf_size, copy_size);
os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size);
return copy_size;
}
void flush()
{
if(!_rx_buf) {
return;
}
if(_pcb) {
tcp_recved(_pcb, (size_t) _rx_buf->tot_len);
}
pbuf_free(_rx_buf);
_rx_buf = 0;
_rx_buf_offset = 0;
}
uint8_t state() const
{
if(!_pcb) {
return CLOSED;
}
return _pcb->state;
}
size_t write(const uint8_t* data, size_t size)
{
if (!_pcb) {
return 0;
}
return _write_from_source(new BufferDataSource(data, size));
}
size_t write(Stream& stream)
{
if (!_pcb) {
return 0;
}
return _write_from_source(new BufferedStreamDataSource<Stream>(stream, stream.available()));
}
size_t write_P(PGM_P buf, size_t size)
{
if (!_pcb) {
return 0;
}
ProgmemStream stream(buf, size);
return _write_from_source(new BufferedStreamDataSource<ProgmemStream>(stream, size));
}
protected:
void _cancel_write()
{
if (_datasource) {
delete _datasource;
_datasource = nullptr;
esp_schedule();
}
}
size_t _write_from_source(DataSource* ds)
{
assert(_datasource == nullptr);
_datasource = ds;
_written = 0;
_write_some();
while (_datasource && !_noblock) {
_send_waiting = true;
esp_yield();
}
_send_waiting = false;
return _written;
}
void _write_some()
{
if (!_datasource || !_pcb) {
return;
}
size_t left = _datasource->available();
size_t can_send = tcp_sndbuf(_pcb);
if (_pcb->snd_queuelen >= TCP_SND_QUEUELEN) {
can_send = 0;
}
size_t will_send = (can_send < left) ? can_send : left;
if (will_send) {
const uint8_t* buf = _datasource->get_buffer(will_send);
err_t err = tcp_write(_pcb, buf, will_send, TCP_WRITE_FLAG_COPY);
_datasource->release_buffer(buf, will_send);
if (err == ERR_OK) {
_written += will_send;
tcp_output(_pcb);
}
}
err_t _poll(tcp_pcb* pcb) {
return ERR_OK;
if (!_datasource->available() || _noblock) {
delete _datasource;
_datasource = nullptr;
}
}
void _write_some_from_cb()
{
_write_some();
if (!_datasource && _send_waiting) {
esp_schedule();
}
}
err_t _sent(tcp_pcb* pcb, uint16_t len)
{
DEBUGV(":sent %d\r\n", len);
_write_some_from_cb();
return ERR_OK;
}
void _consume(size_t size)
{
ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size;
if(left > 0) {
_rx_buf_offset += size;
} else if(!_rx_buf->next) {
DEBUGV(":c0 %d, %d\r\n", size, _rx_buf->tot_len);
if(_pcb) {
tcp_recved(_pcb, _rx_buf->len);
}
pbuf_free(_rx_buf);
_rx_buf = 0;
_rx_buf_offset = 0;
} else {
DEBUGV(":c %d, %d, %d\r\n", size, _rx_buf->len, _rx_buf->tot_len);
auto head = _rx_buf;
_rx_buf = _rx_buf->next;
_rx_buf_offset = 0;
pbuf_ref(_rx_buf);
if(_pcb) {
tcp_recved(_pcb, head->len);
}
pbuf_free(head);
}
}
recv_ret_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err)
{
if(pb == 0) { // connection closed
DEBUGV(":rcl\r\n");
_cancel_write();
abort();
return ERR_ABRT;
}
static recv_ret_t _s_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *pb, err_t err) {
return reinterpret_cast<ClientContext*>(arg)->_recv(tpcb, pb, err);
if(_rx_buf) {
DEBUGV(":rch %d, %d\r\n", _rx_buf->tot_len, pb->tot_len);
pbuf_cat(_rx_buf, pb);
} else {
DEBUGV(":rn %d\r\n", pb->tot_len);
_rx_buf = pb;
_rx_buf_offset = 0;
}
return ERR_OK;
}
static void _s_error(void *arg, err_t err) {
reinterpret_cast<ClientContext*>(arg)->_error(err);
}
void _error(err_t err)
{
DEBUGV(":er %d %08x\r\n", err, (uint32_t) _datasource);
tcp_arg(_pcb, NULL);
tcp_sent(_pcb, NULL);
tcp_recv(_pcb, NULL);
tcp_err(_pcb, NULL);
_pcb = NULL;
_cancel_write();
}
static err_t _s_poll(void *arg, struct tcp_pcb *tpcb) {
return reinterpret_cast<ClientContext*>(arg)->_poll(tpcb);
}
err_t _poll(tcp_pcb*)
{
_write_some_from_cb();
return ERR_OK;
}
static err_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len) {
return reinterpret_cast<ClientContext*>(arg)->_sent(tpcb, len);
}
static recv_ret_t _s_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *pb, err_t err)
{
return reinterpret_cast<ClientContext*>(arg)->_recv(tpcb, pb, err);
}
private:
tcp_pcb* _pcb;
static void _s_error(void *arg, err_t err)
{
reinterpret_cast<ClientContext*>(arg)->_error(err);
}
pbuf* _rx_buf;
size_t _rx_buf_offset;
static err_t _s_poll(void *arg, struct tcp_pcb *tpcb)
{
return reinterpret_cast<ClientContext*>(arg)->_poll(tpcb);
}
discard_cb_t _discard_cb;
void* _discard_cb_arg;
static err_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len)
{
return reinterpret_cast<ClientContext*>(arg)->_sent(tpcb, len);
}
int _refcnt;
ClientContext* _next;
private:
tcp_pcb* _pcb;
size_t _size_sent;
bool _send_waiting;
pbuf* _rx_buf;
size_t _rx_buf_offset;
discard_cb_t _discard_cb;
void* _discard_cb_arg;
int _refcnt;
ClientContext* _next;
DataSource* _datasource = nullptr;
size_t _written = 0;
bool _noblock = false;
bool _send_waiting = false;
};
#endif//CLIENTCONTEXT_H

View File

@ -0,0 +1,113 @@
/* DataSource.h - a read-only object similar to Stream, but with less methods
* Copyright (c) 2016 Ivan Grokhotkov. All rights reserved.
* This file is distributed under MIT license.
*/
#ifndef DATASOURCE_H
#define DATASOURCE_H
#include <assert.h>
class DataSource {
public:
virtual ~DataSource() {}
virtual size_t available() = 0;
virtual const uint8_t* get_buffer(size_t size) = 0;
virtual void release_buffer(const uint8_t* buffer, size_t size) = 0;
};
class BufferDataSource : public DataSource {
public:
BufferDataSource(const uint8_t* data, size_t size) :
_data(data),
_size(size)
{
}
size_t available() override
{
return _size - _pos;
}
const uint8_t* get_buffer(size_t size) override
{
assert(_pos + size <= _size);
return _data + _pos;
}
void release_buffer(const uint8_t* buffer, size_t size) override
{
assert(buffer == _data + _pos);
_pos += size;
}
protected:
const uint8_t* _data;
const size_t _size;
size_t _pos = 0;
};
template<typename TStream>
class BufferedStreamDataSource : public DataSource {
public:
BufferedStreamDataSource(TStream& stream, size_t size) :
_stream(stream),
_size(size)
{
}
size_t available() override
{
return _size - _pos;
}
const uint8_t* get_buffer(size_t size) override
{
assert(_pos + size <= _size);
if (_bufferSize < size) {
_buffer.reset(new uint8_t[size]);
_bufferSize = size;
}
size_t cb = _stream.readBytes(reinterpret_cast<char*>(_buffer.get()), size);
assert(cb == size);
return _buffer.get();
}
void release_buffer(const uint8_t* buffer, size_t size) override
{
_pos += size;
}
protected:
TStream& _stream;
std::unique_ptr<uint8_t[]> _buffer;
size_t _size;
size_t _pos = 0;
size_t _bufferSize = 0;
};
class ProgmemStream
{
public:
ProgmemStream(PGM_P buf, size_t size) :
_buf(buf),
_left(size)
{
}
size_t readBytes(char* dst, size_t size)
{
size_t will_read = (_left < size) ? _left : size;
memcpy_P((void*)dst, (PGM_VOID_P)_buf, will_read);
_left -= will_read;
_buf += will_read;
return will_read;
}
protected:
PGM_P _buf;
size_t _left;
};
#endif //DATASOURCE_H