mirror of
https://github.com/esp8266/Arduino.git
synced 2025-12-13 20:03:19 +03:00
ClientContext (tcp) updates (#5089)
* +sync, get/set default nodelay, sync * default nodelay=1 * update flush() * fix return value * ClientContext: put things together * ClientContext: fix debugging messages * WiFiClient: move static members out of the class, add comments * remove circular dependency * parameter and return value for Client::flush&stop, flush timeout raised to 300ms * tcp flush: restart timer on ack receive * OTA protocol needs setNoDelay(true) * fix Ethernet with Client changes * 1 line unredable -> 5 lines readable code * doc * Update client-class.rst * Added details for getters
This commit is contained in:
@@ -31,11 +31,14 @@ extern "C" void esp_schedule();
|
||||
|
||||
#include "DataSource.h"
|
||||
|
||||
bool getDefaultPrivateGlobalSyncValue ();
|
||||
|
||||
class ClientContext
|
||||
{
|
||||
public:
|
||||
ClientContext(tcp_pcb* pcb, discard_cb_t discard_cb, void* discard_cb_arg) :
|
||||
_pcb(pcb), _rx_buf(0), _rx_buf_offset(0), _discard_cb(discard_cb), _discard_cb_arg(discard_cb_arg), _refcnt(0), _next(0)
|
||||
_pcb(pcb), _rx_buf(0), _rx_buf_offset(0), _discard_cb(discard_cb), _discard_cb_arg(discard_cb_arg), _refcnt(0), _next(0),
|
||||
_sync(::getDefaultPrivateGlobalSyncValue())
|
||||
{
|
||||
tcp_setprio(pcb, TCP_PRIO_MIN);
|
||||
tcp_arg(pcb, this);
|
||||
@@ -44,7 +47,7 @@ public:
|
||||
tcp_err(pcb, &_s_error);
|
||||
tcp_poll(pcb, &_s_poll, 1);
|
||||
|
||||
// not enabled by default for 2.4.0
|
||||
// keep-alive not enabled by default
|
||||
//keepAlive();
|
||||
}
|
||||
|
||||
@@ -159,7 +162,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
bool getNoDelay()
|
||||
bool getNoDelay() const
|
||||
{
|
||||
if(!_pcb) {
|
||||
return false;
|
||||
@@ -167,17 +170,17 @@ public:
|
||||
return tcp_nagle_disabled(_pcb);
|
||||
}
|
||||
|
||||
void setTimeout(int timeout_ms)
|
||||
void setTimeout(int timeout_ms)
|
||||
{
|
||||
_timeout_ms = timeout_ms;
|
||||
}
|
||||
|
||||
int getTimeout()
|
||||
int getTimeout() const
|
||||
{
|
||||
return _timeout_ms;
|
||||
}
|
||||
|
||||
uint32_t getRemoteAddress()
|
||||
uint32_t getRemoteAddress() const
|
||||
{
|
||||
if(!_pcb) {
|
||||
return 0;
|
||||
@@ -186,7 +189,7 @@ public:
|
||||
return _pcb->remote_ip.addr;
|
||||
}
|
||||
|
||||
uint16_t getRemotePort()
|
||||
uint16_t getRemotePort() const
|
||||
{
|
||||
if(!_pcb) {
|
||||
return 0;
|
||||
@@ -195,7 +198,7 @@ public:
|
||||
return _pcb->remote_port;
|
||||
}
|
||||
|
||||
uint32_t getLocalAddress()
|
||||
uint32_t getLocalAddress() const
|
||||
{
|
||||
if(!_pcb) {
|
||||
return 0;
|
||||
@@ -204,7 +207,7 @@ public:
|
||||
return _pcb->local_ip.addr;
|
||||
}
|
||||
|
||||
uint16_t getLocalPort()
|
||||
uint16_t getLocalPort() const
|
||||
{
|
||||
if(!_pcb) {
|
||||
return 0;
|
||||
@@ -257,7 +260,7 @@ public:
|
||||
return size_read;
|
||||
}
|
||||
|
||||
char peek()
|
||||
char peek() const
|
||||
{
|
||||
if(!_rx_buf) {
|
||||
return 0;
|
||||
@@ -266,7 +269,7 @@ public:
|
||||
return reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset];
|
||||
}
|
||||
|
||||
size_t peekBytes(char *dst, size_t size)
|
||||
size_t peekBytes(char *dst, size_t size) const
|
||||
{
|
||||
if(!_rx_buf) {
|
||||
return 0;
|
||||
@@ -296,20 +299,48 @@ public:
|
||||
_rx_buf_offset = 0;
|
||||
}
|
||||
|
||||
void wait_until_sent()
|
||||
bool wait_until_sent(int max_wait_ms = WIFICLIENT_MAX_FLUSH_WAIT_MS)
|
||||
{
|
||||
// fix option 1 in
|
||||
// https://github.com/esp8266/Arduino/pull/3967#pullrequestreview-83451496
|
||||
// TODO: option 2
|
||||
// option 1 done
|
||||
// option 2 / _write_some() not necessary since _datasource is always nullptr here
|
||||
|
||||
#define WAIT_TRIES_MS 10 // at most 10ms
|
||||
if (!_pcb)
|
||||
return true;
|
||||
|
||||
int tries = 1+ WAIT_TRIES_MS;
|
||||
int loop = -1;
|
||||
int prevsndbuf = -1;
|
||||
max_wait_ms++;
|
||||
|
||||
while (state() == ESTABLISHED && tcp_sndbuf(_pcb) != TCP_SND_BUF && --tries) {
|
||||
_write_some();
|
||||
delay(1); // esp_ schedule+yield
|
||||
// wait for peer's acks to flush lwIP's output buffer
|
||||
|
||||
while (1) {
|
||||
|
||||
// force lwIP to send what can be sent
|
||||
tcp_output(_pcb);
|
||||
|
||||
int sndbuf = tcp_sndbuf(_pcb);
|
||||
if (sndbuf != prevsndbuf) {
|
||||
// send buffer has changed (or first iteration)
|
||||
// we received an ack: restart the loop counter
|
||||
prevsndbuf = sndbuf;
|
||||
loop = max_wait_ms;
|
||||
}
|
||||
|
||||
if (state() != ESTABLISHED || sndbuf == TCP_SND_BUF || --loop <= 0)
|
||||
break;
|
||||
|
||||
delay(1);
|
||||
}
|
||||
|
||||
#ifdef DEBUGV
|
||||
if (loop <= 0) {
|
||||
// wait until sent: timeout
|
||||
DEBUGV(":wustmo\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
return max_wait_ms > 0;
|
||||
}
|
||||
|
||||
uint8_t state() const
|
||||
@@ -321,7 +352,6 @@ public:
|
||||
return _pcb->state;
|
||||
}
|
||||
|
||||
|
||||
size_t write(const uint8_t* data, size_t size)
|
||||
{
|
||||
if (!_pcb) {
|
||||
@@ -379,6 +409,16 @@ public:
|
||||
return isKeepAliveEnabled()? _pcb->keep_cnt: 0;
|
||||
}
|
||||
|
||||
bool getSync () const
|
||||
{
|
||||
return _sync;
|
||||
}
|
||||
|
||||
void setSync (bool sync)
|
||||
{
|
||||
_sync = sync;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
bool _is_timeout()
|
||||
@@ -418,6 +458,10 @@ protected:
|
||||
esp_yield();
|
||||
} while(true);
|
||||
_send_waiting = 0;
|
||||
|
||||
if (_sync)
|
||||
wait_until_sent();
|
||||
|
||||
return _written;
|
||||
}
|
||||
|
||||
@@ -427,40 +471,50 @@ protected:
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t left = _datasource->available();
|
||||
size_t can_send = tcp_sndbuf(_pcb);
|
||||
if (_pcb->snd_queuelen >= TCP_SND_QUEUELEN) {
|
||||
can_send = 0;
|
||||
}
|
||||
size_t will_send = (can_send < left) ? can_send : left;
|
||||
DEBUGV(":wr %d %d %d\r\n", will_send, left, _written);
|
||||
bool need_output = false;
|
||||
while( will_send && _datasource) {
|
||||
size_t next_chunk =
|
||||
will_send > _write_chunk_size ? _write_chunk_size : will_send;
|
||||
const uint8_t* buf = _datasource->get_buffer(next_chunk);
|
||||
if (state() == CLOSED) {
|
||||
need_output = false;
|
||||
DEBUGV(":wr %d %d\r\n", _datasource->available(), _written);
|
||||
|
||||
bool has_written = false;
|
||||
|
||||
while (_datasource) {
|
||||
if (state() == CLOSED)
|
||||
return false;
|
||||
size_t next_chunk_size = std::min((size_t)tcp_sndbuf(_pcb), _datasource->available());
|
||||
if (!next_chunk_size)
|
||||
break;
|
||||
}
|
||||
err_t err = tcp_write(_pcb, buf, next_chunk, TCP_WRITE_FLAG_COPY);
|
||||
DEBUGV(":wrc %d %d %d\r\n", next_chunk, will_send, (int) err);
|
||||
const uint8_t* buf = _datasource->get_buffer(next_chunk_size);
|
||||
// use TCP_WRITE_FLAG_MORE to remove PUSH flag from packet (lwIP's doc),
|
||||
// because PUSH code implicitely disables Nagle code (see lwIP's tcp_out.c)
|
||||
// Notes:
|
||||
// PUSH is meant for peer, telling to give data to user app as soon as received
|
||||
// PUSH "may be set" when sender has finished sending a meaningful data block
|
||||
// PUSH is quite unclear in its application
|
||||
// Nagle is for shortly delaying outgoing data, to send less/bigger packets
|
||||
uint8_t flags = TCP_WRITE_FLAG_MORE; // do not tcp-PuSH
|
||||
if (!_sync)
|
||||
// user data must be copied when data are sent but not yet acknowledged
|
||||
// (with sync, we wait for acknowledgment before returning to user)
|
||||
flags |= TCP_WRITE_FLAG_COPY;
|
||||
err_t err = tcp_write(_pcb, buf, next_chunk_size, flags);
|
||||
DEBUGV(":wrc %d %d %d\r\n", next_chunk_size, _datasource->available(), (int)err);
|
||||
if (err == ERR_OK) {
|
||||
_datasource->release_buffer(buf, next_chunk);
|
||||
_written += next_chunk;
|
||||
need_output = true;
|
||||
_datasource->release_buffer(buf, next_chunk_size);
|
||||
_written += next_chunk_size;
|
||||
has_written = true;
|
||||
} else {
|
||||
// ERR_MEM(-1) is a valid error meaning
|
||||
// "come back later". It leaves state() opened
|
||||
break;
|
||||
}
|
||||
will_send -= next_chunk;
|
||||
}
|
||||
if( need_output ) {
|
||||
|
||||
if (has_written && (_sync || tcp_nagle_disabled(_pcb)))
|
||||
{
|
||||
// handle no-Nagle manually because of TCP_WRITE_FLAG_MORE
|
||||
// lwIP's tcp_output doc: "Find out what we can send and send it"
|
||||
tcp_output(_pcb);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
return has_written;
|
||||
}
|
||||
|
||||
void _write_some_from_cb()
|
||||
@@ -482,14 +536,13 @@ protected:
|
||||
|
||||
void _consume(size_t size)
|
||||
{
|
||||
if(_pcb)
|
||||
tcp_recved(_pcb, size);
|
||||
ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size;
|
||||
if(left > 0) {
|
||||
_rx_buf_offset += size;
|
||||
} else if(!_rx_buf->next) {
|
||||
DEBUGV(":c0 %d, %d\r\n", size, _rx_buf->tot_len);
|
||||
if(_pcb) {
|
||||
tcp_recved(_pcb, _rx_buf->len);
|
||||
}
|
||||
pbuf_free(_rx_buf);
|
||||
_rx_buf = 0;
|
||||
_rx_buf_offset = 0;
|
||||
@@ -499,9 +552,6 @@ protected:
|
||||
_rx_buf = _rx_buf->next;
|
||||
_rx_buf_offset = 0;
|
||||
pbuf_ref(_rx_buf);
|
||||
if(_pcb) {
|
||||
tcp_recved(_pcb, head->len);
|
||||
}
|
||||
pbuf_free(head);
|
||||
}
|
||||
}
|
||||
@@ -592,7 +642,6 @@ private:
|
||||
|
||||
DataSource* _datasource = nullptr;
|
||||
size_t _written = 0;
|
||||
size_t _write_chunk_size = 256;
|
||||
uint32_t _timeout_ms = 5000;
|
||||
uint32_t _op_start_time = 0;
|
||||
uint8_t _send_waiting = 0;
|
||||
@@ -600,6 +649,8 @@ private:
|
||||
|
||||
int8_t _refcnt;
|
||||
ClientContext* _next;
|
||||
|
||||
bool _sync;
|
||||
};
|
||||
|
||||
#endif//CLIENTCONTEXT_H
|
||||
|
||||
Reference in New Issue
Block a user