diff --git a/cores/esp8266/Stream.cpp b/cores/esp8266/Stream.cpp index 3867ea6c0..a51bfa929 100644 --- a/cores/esp8266/Stream.cpp +++ b/cores/esp8266/Stream.cpp @@ -36,6 +36,7 @@ int ICACHE_FLASH_ATTR Stream::timedRead() do { c = read(); if (c >= 0) return c; + yield(); } while(millis() - _startMillis < _timeout); return -1; // -1 indicates timeout } @@ -48,6 +49,7 @@ int ICACHE_FLASH_ATTR Stream::timedPeek() do { c = peek(); if (c >= 0) return c; + yield(); } while(millis() - _startMillis < _timeout); return -1; // -1 indicates timeout } diff --git a/libraries/ESP8266WiFi/src/include/ClientContext.h b/libraries/ESP8266WiFi/src/include/ClientContext.h index fde378491..071f1405f 100644 --- a/libraries/ESP8266WiFi/src/include/ClientContext.h +++ b/libraries/ESP8266WiFi/src/include/ClientContext.h @@ -36,7 +36,9 @@ public: ClientContext(tcp_pcb* pcb, size_t rx_buffer_size, discard_cb_t discard_cb, void* discard_cb_arg) : _pcb(pcb) - , _rx_buf(rx_buffer_size) + // , _rx_buf(rx_buffer_size) + , _rx_buf(0) + , _rx_buf_offset(0) , _discard_cb(discard_cb) , _discard_cb_arg(discard_cb_arg) , _refcnt(0) @@ -72,7 +74,7 @@ public: void unref() { - DEBUGV("WC:ur %d\r\n", _refcnt); + DEBUGV(":ur %d\r\n", _refcnt); if (--_refcnt == 0) { if (_pcb) @@ -90,28 +92,64 @@ public: size_t getSize() const { - return _rx_buf.getSize(); + if (!_rx_buf) + return 0; + + return _rx_buf->tot_len - _rx_buf_offset; } char read() { - return _rx_buf.read(); + if (!_rx_buf) + return 0; + + char c = reinterpret_cast(_rx_buf->payload)[_rx_buf_offset]; + _consume(1); + return c; } size_t read(char* dst, size_t size) { - DEBUGV("WC:rd\r\n"); - return _rx_buf.read(dst, size); + if (!_rx_buf) + return 0; + + size_t max_size = _rx_buf->tot_len - _rx_buf_offset; + size = (size < max_size) ? size : max_size; + + DEBUGV(":rd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset); + size_t size_read = 0; + while(size) + { + size_t buf_size = _rx_buf->len - _rx_buf_offset; + size_t copy_size = (size < buf_size) ? size : buf_size; + DEBUGV(":rdi %d, %d\r\n", buf_size, copy_size); + os_memcpy(dst, reinterpret_cast(_rx_buf->payload) + _rx_buf_offset, copy_size); + dst += copy_size; + _consume(copy_size); + size -= copy_size; + size_read += copy_size; + } + return size_read; } char peek() { - return _rx_buf.peek(); + if (!_rx_buf) + return 0; + + return reinterpret_cast(_rx_buf->payload)[_rx_buf_offset]; } void flush() { - return _rx_buf.flush(); + if (!_rx_buf) + return; + + size_t len = _rx_buf->tot_len; + tcp_recved(_pcb, len); + pbuf_free(_rx_buf); + _rx_buf = 0; + _rx_buf_offset = 0; } uint8_t state() const @@ -134,23 +172,50 @@ public: return 0; _size_sent = will_send; - DEBUGV("WC:wr\r\n"); + DEBUGV(":wr\r\n"); _send_waiting = true; delay(5000); // max send timeout _send_waiting = false; - DEBUGV("WC:ww\r\n"); + DEBUGV(":ww\r\n"); return will_send - _size_sent; } private: + void _consume(size_t size) + { + ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size; + if (left > 0) + { + _rx_buf_offset += size; + } + else if (!_rx_buf->next) + { + DEBUGV(":c0 %d, %d\r\n", size, _rx_buf->tot_len); + tcp_recved(_pcb, _rx_buf->len); + pbuf_free(_rx_buf); + _rx_buf = 0; + _rx_buf_offset = 0; + } + else + { + DEBUGV(":c %d, %d, %d\r\n", size, _rx_buf->len, _rx_buf->tot_len); + auto head = _rx_buf; + _rx_buf = _rx_buf->next; + _rx_buf_offset = 0; + pbuf_ref(_rx_buf); + tcp_recved(_pcb, head->len); + pbuf_free(head); + } + } + err_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err) { if (pb == 0) // connection closed { - DEBUGV("WC:rcl\r\n"); + DEBUGV(":rcl\r\n"); tcp_arg(pcb, NULL); tcp_sent(pcb, NULL); tcp_recv(pcb, NULL); @@ -158,7 +223,7 @@ private: int error = tcp_close(pcb); if (error != ERR_OK) { - DEBUGV("WC:rcla\r\n"); + DEBUGV(":rcla\r\n"); tcp_abort(pcb); _pcb = 0; return ERR_ABRT; @@ -167,17 +232,27 @@ private: return ERR_OK; } - size_t len = pb->len; - DEBUGV("WC:rcr %d\r\n", len); - _rx_buf.write(reinterpret_cast(pb->payload), pb->len); - tcp_recved(pcb, len); - pbuf_free(pb); + if (_rx_buf) + { + // there is some unread data + // chain the new pbuf to the existing one + DEBUGV(":rch %d, %d\r\n", _rx_buf->tot_len, pb->tot_len); + pbuf_cat(_rx_buf, pb); + } + else + { + DEBUGV(":rn %d\r\n", pb->tot_len); + _rx_buf = pb; + _rx_buf_offset = 0; + } + // tcp_recved(pcb, received); + // pbuf_free(pb); return ERR_OK; } void _error(err_t err) { - DEBUGV("WC:er\r\n"); + DEBUGV(":er\r\n"); _pcb = 0; if (_size_sent && _send_waiting) esp_schedule(); @@ -191,7 +266,7 @@ private: err_t _sent(tcp_pcb* pcb, uint16_t len) { - DEBUGV("WC:sent %d\r\n", len); + DEBUGV(":sent %d\r\n", len); _size_sent -= len; if (_size_sent == 0 && _send_waiting) esp_schedule(); @@ -223,7 +298,9 @@ private: ClientContext* _next; int _refcnt; tcp_pcb* _pcb; - cbuf _rx_buf; + pbuf* _rx_buf; + size_t _rx_buf_offset; + // cbuf _rx_buf; discard_cb_t _discard_cb; void* _discard_cb_arg; size_t _size_sent;