1
0
mirror of https://github.com/esp8266/Arduino.git synced 2025-07-27 18:02:17 +03:00

Stream::send() (#6979)

This commit is contained in:
david gauchard
2021-03-15 01:36:20 +01:00
committed by GitHub
parent 4cc1472821
commit c720c0d9e8
48 changed files with 2136 additions and 650 deletions

View File

@ -0,0 +1,141 @@
/*
WiFiEcho - Echo server
released to public domain
*/
#include <ESP8266WiFi.h>
#include <ESP8266mDNS.h>
#include <PolledTimeout.h>
#include <algorithm> // std::min
#ifndef STASSID
#define STASSID "your-ssid"
#define STAPSK "your-password"
#endif
constexpr int port = 23;
WiFiServer server(port);
WiFiClient client;
constexpr size_t sizes [] = { 0, 512, 384, 256, 128, 64, 16, 8, 4 };
constexpr uint32_t breathMs = 200;
esp8266::polledTimeout::oneShotFastMs enoughMs(breathMs);
esp8266::polledTimeout::periodicFastMs test(2000);
int t = 1; // test (1, 2 or 3, see below)
int s = 0; // sizes[] index
void setup() {
Serial.begin(115200);
Serial.println(ESP.getFullVersion());
WiFi.mode(WIFI_STA);
WiFi.begin(STASSID, STAPSK);
Serial.print("\nConnecting to ");
Serial.println(STASSID);
while (WiFi.status() != WL_CONNECTED) {
Serial.print('.');
delay(500);
}
Serial.println();
Serial.print("connected, address=");
Serial.println(WiFi.localIP());
server.begin();
MDNS.begin("echo23");
Serial.printf("Ready!\n"
"- Use 'telnet/nc echo23.local %d' to try echo\n\n"
"- Use 'python3 echo-client.py' bandwidth meter to compare transfer APIs\n\n"
" and try typing 1, 1, 1, 2, 2, 2, 3, 3, 3 on console during transfers\n\n",
port);
}
void loop() {
MDNS.update();
static uint32_t tot = 0;
static uint32_t cnt = 0;
if (test && cnt) {
Serial.printf("measured-block-size=%u min-free-stack=%u", tot / cnt, ESP.getFreeContStack());
if (t == 2 && sizes[s]) {
Serial.printf(" (blocks: at most %d bytes)", sizes[s]);
}
if (t == 3 && sizes[s]) {
Serial.printf(" (blocks: exactly %d bytes)", sizes[s]);
}
if (t == 3 && !sizes[s]) {
Serial.printf(" (blocks: any size)");
}
Serial.printf("\n");
}
//check if there are any new clients
if (server.hasClient()) {
client = server.available();
Serial.println("New client");
}
if (Serial.available()) {
s = (s + 1) % (sizeof(sizes) / sizeof(sizes[0]));
switch (Serial.read()) {
case '1': if (t != 1) s = 0; t = 1; Serial.println("byte-by-byte (watch then press 2 or 3)"); break;
case '2': if (t != 2) s = 1; t = 2; Serial.printf("through buffer (watch then press 2 again, or 1 or 3)\n"); break;
case '3': if (t != 3) s = 0; t = 3; Serial.printf("direct access (watch then press 3 again, or 1 or 2)\n"); break;
}
tot = cnt = 0;
ESP.resetFreeContStack();
}
enoughMs.reset(breathMs);
if (t == 1) {
// byte by byte
while (client.available() && client.availableForWrite() && !enoughMs) {
// working char by char is not efficient
client.write(client.read());
cnt++;
tot += 1;
}
}
else if (t == 2) {
// block by block through a local buffer (2 copies)
while (client.available() && client.availableForWrite() && !enoughMs) {
size_t maxTo = std::min(client.available(), client.availableForWrite());
maxTo = std::min(maxTo, sizes[s]);
uint8_t buf[maxTo];
size_t tcp_got = client.read(buf, maxTo);
size_t tcp_sent = client.write(buf, tcp_got);
if (tcp_sent != maxTo) {
Serial.printf("len mismatch: available:%zd tcp-read:%zd serial-write:%zd\n", maxTo, tcp_got, tcp_sent);
}
tot += tcp_sent;
cnt++;
}
}
else if (t == 3) {
// stream to print, possibly with only one copy
if (sizes[s]) {
tot += client.sendSize(&client, sizes[s]);
} else {
tot += client.sendAll(&client);
}
cnt++;
switch (client.getLastSendReport()) {
case Stream::Report::Success: break;
case Stream::Report::TimedOut: Serial.println("Stream::send: timeout"); break;
case Stream::Report::ReadError: Serial.println("Stream::send: read error"); break;
case Stream::Report::WriteError: Serial.println("Stream::send: write error"); break;
case Stream::Report::ShortOperation: Serial.println("Stream::send: short transfer"); break;
}
}
}

View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
import os
import asyncio
# 512 bytes
message = bytearray(512);
bufsize=len(message)
print('message len=', bufsize)
global recv
recv = 0
async def tcp_echo_open (ip, port):
return await asyncio.open_connection(ip, port)
async def tcp_echo_sender(message, writer):
print('Writer started')
while True:
writer.write(message)
await writer.drain()
async def tcp_echo_receiver(message, reader):
global recv
print('Reader started')
while True:
data = ''.encode('utf8')
while len(data) < bufsize:
data += await reader.read(bufsize - len(data))
recv += len(data);
if data != message:
print('error')
async def tcp_stat():
global recv
dur = 0
loopsec = 2
while True:
last = recv
await asyncio.sleep(loopsec) # drifting
dur += loopsec
print('BW=', (recv - last) * 2 * 8 / 1024 / loopsec, 'Kibits/s avg=', recv * 2 * 8 / 1024 / dur)
loop = asyncio.get_event_loop()
reader, writer = loop.run_until_complete(tcp_echo_open('echo23.local', 23))
loop.create_task(tcp_echo_receiver(message, reader))
loop.create_task(tcp_echo_sender(message, writer))
loop.create_task(tcp_stat())
loop.run_forever()

View File

@ -125,7 +125,7 @@ int CertStore::initCertStore(fs::FS &fs, const char *indexFileName, const char *
uint8_t fileHeader[60];
// 0..15 = filename in ASCII
// 48...57 = length in decimal ASCII
uint32_t length;
int32_t length;
if (data.read(fileHeader, sizeof(fileHeader)) != sizeof(fileHeader)) {
break;
}
@ -201,7 +201,7 @@ const br_x509_trust_anchor *CertStore::findHashedTA(void *ctx, void *hashed_dn,
free(der);
return nullptr;
}
if (data.read((uint8_t *)der, ci.length) != ci.length) {
if (data.read(der, ci.length) != (int)ci.length) {
free(der);
return nullptr;
}

View File

@ -244,7 +244,7 @@ size_t WiFiClient::write_P(PGM_P buf, size_t size)
int WiFiClient::available()
{
if (!_client)
return false;
return 0;
int result = _client->getSize();
@ -262,10 +262,14 @@ int WiFiClient::read()
return _client->read();
}
int WiFiClient::read(uint8_t* buf, size_t size)
{
return (int) _client->read(reinterpret_cast<char*>(buf), size);
return (int)_client->read((char*)buf, size);
}
int WiFiClient::read(char* buf, size_t size)
{
return (int)_client->read(buf, size);
}
int WiFiClient::peek()
@ -412,3 +416,28 @@ uint8_t WiFiClient::getKeepAliveCount () const
{
return _client->getKeepAliveCount();
}
bool WiFiClient::hasPeekBufferAPI () const
{
return true;
}
// return a pointer to available data buffer (size = peekAvailable())
// semantic forbids any kind of read() before calling peekConsume()
const char* WiFiClient::peekBuffer ()
{
return _client? _client->peekBuffer(): nullptr;
}
// return number of byte accessible by peekBuffer()
size_t WiFiClient::peekAvailable ()
{
return _client? _client->peekAvailable(): 0;
}
// consume bytes after use (see peekBuffer)
void WiFiClient::peekConsume (size_t consume)
{
if (_client)
_client->peekConsume(consume);
}

View File

@ -66,7 +66,9 @@ public:
virtual int available() override;
virtual int read() override;
virtual int read(uint8_t *buf, size_t size) override;
virtual int read(uint8_t* buf, size_t size) override;
int read(char* buf, size_t size);
virtual int peek() override;
virtual size_t peekBytes(uint8_t *buffer, size_t length);
size_t peekBytes(char *buffer, size_t length) {
@ -120,6 +122,22 @@ public:
bool getSync() const;
void setSync(bool sync);
// peek buffer API is present
virtual bool hasPeekBufferAPI () const override;
// return number of byte accessible by peekBuffer()
virtual size_t peekAvailable () override;
// return a pointer to available data buffer (size = peekAvailable())
// semantic forbids any kind of read() before calling peekConsume()
virtual const char* peekBuffer () override;
// consume bytes after use (see peekBuffer)
virtual void peekConsume (size_t consume) override;
virtual bool outputCanTimeout () override { return connected(); }
virtual bool inputCanTimeout () override { return connected(); }
protected:
static int8_t _s_connected(void* arg, void* tpcb, int8_t err);

View File

@ -362,6 +362,22 @@ int WiFiClientSecureCtx::read(uint8_t *buf, size_t size) {
return 0; // If we're connected, no error but no read.
}
// return a pointer to available data buffer (size = peekAvailable())
// semantic forbids any kind of read() before calling peekConsume()
const char* WiFiClientSecureCtx::peekBuffer ()
{
return (const char*)_recvapp_buf;
}
// consume bytes after use (see peekBuffer)
void WiFiClientSecureCtx::peekConsume (size_t consume)
{
// according to WiFiClientSecureCtx::read:
br_ssl_engine_recvapp_ack(_eng, consume);
_recvapp_buf = nullptr;
_recvapp_len = 0;
}
int WiFiClientSecureCtx::read() {
uint8_t c;
if (1 == read(&c, 1)) {

View File

@ -48,6 +48,7 @@ class WiFiClientSecureCtx : public WiFiClient {
size_t write_P(PGM_P buf, size_t size) override;
size_t write(Stream& stream); // Note this is not virtual
int read(uint8_t *buf, size_t size) override;
int read(char *buf, size_t size) { return read((uint8_t*)buf, size); }
int available() override;
int read() override;
int peek() override;
@ -120,6 +121,19 @@ class WiFiClientSecureCtx : public WiFiClient {
bool setCiphers(const std::vector<uint16_t>& list);
bool setCiphersLessSecure(); // Only use the limited set of RSA ciphers without EC
// peek buffer API is present
virtual bool hasPeekBufferAPI () const override { return true; }
// return number of byte accessible by peekBuffer()
virtual size_t peekAvailable () override { return WiFiClientSecureCtx::available(); }
// return a pointer to available data buffer (size = peekAvailable())
// semantic forbids any kind of read() before calling peekConsume()
virtual const char* peekBuffer () override;
// consume bytes after use (see peekBuffer)
virtual void peekConsume (size_t consume) override;
protected:
bool _connectSSL(const char *hostName); // Do initial SSL handshake
@ -287,6 +301,19 @@ class WiFiClientSecure : public WiFiClient {
static bool probeMaxFragmentLength(const char *hostname, uint16_t port, uint16_t len);
static bool probeMaxFragmentLength(const String& host, uint16_t port, uint16_t len);
// peek buffer API is present
virtual bool hasPeekBufferAPI () const override { return true; }
// return number of byte accessible by peekBuffer()
virtual size_t peekAvailable () override { return _ctx->available(); }
// return a pointer to available data buffer (size = peekAvailable())
// semantic forbids any kind of read() before calling peekConsume()
virtual const char* peekBuffer () override { return _ctx->peekBuffer(); }
// consume bytes after use (see peekBuffer)
virtual void peekConsume (size_t consume) override { return _ctx->peekConsume(consume); }
private:
std::shared_ptr<WiFiClientSecureCtx> _ctx;

View File

@ -29,7 +29,8 @@ typedef void (*discard_cb_t)(void*, ClientContext*);
extern "C" void esp_yield();
extern "C" void esp_schedule();
#include "DataSource.h"
#include <assert.h>
#include <StreamDev.h>
bool getDefaultPrivateGlobalSyncValue ();
@ -374,7 +375,8 @@ public:
if (!_pcb) {
return 0;
}
return _write_from_source(new BufferDataSource(data, size));
StreamConstPtr ptr(data, size);
return _write_from_source(&ptr);
}
size_t write(Stream& stream)
@ -382,7 +384,7 @@ public:
if (!_pcb) {
return 0;
}
return _write_from_source(new BufferedStreamDataSource<Stream>(stream, stream.available()));
return _write_from_source(&stream);
}
size_t write_P(PGM_P buf, size_t size)
@ -390,8 +392,8 @@ public:
if (!_pcb) {
return 0;
}
ProgmemStream stream(buf, size);
return _write_from_source(new BufferedStreamDataSource<ProgmemStream>(stream, size));
StreamConstPtr ptr(buf, size);
return _write_from_source(&ptr);
}
void keepAlive (uint16_t idle_sec = TCP_DEFAULT_KEEPALIVE_IDLE_SEC, uint16_t intv_sec = TCP_DEFAULT_KEEPALIVE_INTERVAL_SEC, uint8_t count = TCP_DEFAULT_KEEPALIVE_COUNT)
@ -436,6 +438,29 @@ public:
_sync = sync;
}
// return a pointer to available data buffer (size = peekAvailable())
// semantic forbids any kind of read() before calling peekConsume()
const char* peekBuffer ()
{
if (!_rx_buf)
return nullptr;
return (const char*)_rx_buf->payload + _rx_buf_offset;
}
// return number of byte accessible by peekBuffer()
size_t peekAvailable ()
{
if (!_rx_buf)
return 0;
return _rx_buf->len - _rx_buf_offset;
}
// consume bytes after use (see peekBuffer)
void peekConsume (size_t consume)
{
_consume(consume);
}
protected:
bool _is_timeout()
@ -452,7 +477,7 @@ protected:
}
}
size_t _write_from_source(DataSource* ds)
size_t _write_from_source(Stream* ds)
{
assert(_datasource == nullptr);
assert(!_send_waiting);
@ -468,7 +493,6 @@ protected:
if (_is_timeout()) {
DEBUGV(":wtmo\r\n");
}
delete _datasource;
_datasource = nullptr;
break;
}
@ -495,20 +519,20 @@ protected:
return false;
}
DEBUGV(":wr %d %d\r\n", _datasource->available(), _written);
DEBUGV(":wr %d %d\r\n", _datasource->peekAvailable(), _written);
bool has_written = false;
while (_datasource) {
if (state() == CLOSED)
return false;
size_t next_chunk_size = std::min((size_t)tcp_sndbuf(_pcb), _datasource->available());
size_t next_chunk_size = std::min((size_t)tcp_sndbuf(_pcb), _datasource->peekAvailable());
if (!next_chunk_size)
break;
const uint8_t* buf = _datasource->get_buffer(next_chunk_size);
const char* buf = _datasource->peekBuffer();
uint8_t flags = 0;
if (next_chunk_size < _datasource->available())
if (next_chunk_size < _datasource->peekAvailable())
// PUSH is meant for peer, telling to give data to user app as soon as received
// PUSH "may be set" when sender has finished sending a "meaningful" data block
// PUSH does not break Nagle
@ -522,15 +546,15 @@ protected:
err_t err = tcp_write(_pcb, buf, next_chunk_size, flags);
DEBUGV(":wrc %d %d %d\r\n", next_chunk_size, _datasource->available(), (int)err);
DEBUGV(":wrc %d %d %d\r\n", next_chunk_size, _datasource->peekAvailable(), (int)err);
if (err == ERR_OK) {
_datasource->release_buffer(buf, next_chunk_size);
_datasource->peekConsume(next_chunk_size);
_written += next_chunk_size;
has_written = true;
} else {
// ERR_MEM(-1) is a valid error meaning
// "come back later". It leaves state() opened
// ERR_MEM(-1) is a valid error meaning
// "come back later". It leaves state() opened
break;
}
}
@ -565,8 +589,6 @@ protected:
void _consume(size_t size)
{
if(_pcb)
tcp_recved(_pcb, size);
ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size;
if(left > 0) {
_rx_buf_offset += size;
@ -583,6 +605,8 @@ protected:
pbuf_ref(_rx_buf);
pbuf_free(head);
}
if(_pcb)
tcp_recved(_pcb, size);
}
err_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err)
@ -683,7 +707,7 @@ private:
discard_cb_t _discard_cb;
void* _discard_cb_arg;
DataSource* _datasource = nullptr;
Stream* _datasource = nullptr;
size_t _written = 0;
uint32_t _timeout_ms = 5000;
uint32_t _op_start_time = 0;

View File

@ -1,154 +0,0 @@
/* DataSource.h - a read-only object similar to Stream, but with less methods
* Copyright (c) 2016 Ivan Grokhotkov. All rights reserved.
* This file is distributed under MIT license.
*/
#ifndef DATASOURCE_H
#define DATASOURCE_H
#include <assert.h>
class DataSource {
public:
virtual ~DataSource() {}
virtual size_t available() = 0;
virtual const uint8_t* get_buffer(size_t size) = 0;
virtual void release_buffer(const uint8_t* buffer, size_t size) = 0;
};
class BufferDataSource : public DataSource {
public:
BufferDataSource(const uint8_t* data, size_t size) :
_data(data),
_size(size)
{
}
size_t available() override
{
return _size - _pos;
}
const uint8_t* get_buffer(size_t size) override
{
(void)size;
assert(_pos + size <= _size);
return _data + _pos;
}
void release_buffer(const uint8_t* buffer, size_t size) override
{
(void)buffer;
assert(buffer == _data + _pos);
_pos += size;
}
protected:
const uint8_t* _data;
const size_t _size;
size_t _pos = 0;
};
template<typename TStream>
class BufferedStreamDataSource : public DataSource {
public:
BufferedStreamDataSource(TStream& stream, size_t size) :
_stream(stream),
_size(size)
{
}
size_t available() override
{
return _size - _pos;
}
const uint8_t* get_buffer(size_t size) override
{
assert(_pos + size <= _size);
//Data that was already read from the stream but not released (e.g. if tcp_write error occured). Otherwise this should be 0.
const size_t stream_read = _streamPos - _pos;
//Min required buffer size: max(requested size, previous stream data already in buffer)
const size_t min_buffer_size = size > stream_read ? size : stream_read;
//Buffer too small?
if (_bufferSize < min_buffer_size) {
uint8_t *new_buffer = new uint8_t[min_buffer_size];
//If stream reading is ahead, than some data is already in the old buffer and needs to be copied to new resized buffer
if (_buffer && stream_read > 0) {
memcpy(new_buffer, _buffer.get(), stream_read);
}
_buffer.reset(new_buffer);
_bufferSize = min_buffer_size;
}
//Fetch remaining data from stream
//If error in tcp_write in ClientContext::_write_some() occured earlier and therefore release_buffer was not called last time, than the requested stream data is already in the buffer.
if (size > stream_read) {
//Remaining bytes to read from stream
const size_t stream_rem = size - stream_read;
const size_t cb = _stream.readBytes(reinterpret_cast<char*>(_buffer.get() + stream_read), stream_rem);
assert(cb == stream_rem);
(void)cb;
_streamPos += stream_rem;
}
return _buffer.get();
}
void release_buffer(const uint8_t* buffer, size_t size) override
{
if (size == 0) {
return;
}
(void)buffer;
_pos += size;
//Cannot release more than acquired through get_buffer
assert(_pos <= _streamPos);
//Release less than requested with get_buffer?
if (_pos < _streamPos) {
// Move unreleased stream data in buffer to front
assert(_buffer);
memmove(_buffer.get(), _buffer.get() + size, _streamPos - _pos);
}
}
protected:
TStream & _stream;
std::unique_ptr<uint8_t[]> _buffer;
size_t _size;
size_t _pos = 0;
size_t _bufferSize = 0;
size_t _streamPos = 0;
};
class ProgmemStream
{
public:
ProgmemStream(PGM_P buf, size_t size) :
_buf(buf),
_left(size)
{
}
size_t readBytes(char* dst, size_t size)
{
size_t will_read = (_left < size) ? _left : size;
memcpy_P((void*)dst, (PGM_VOID_P)_buf, will_read);
_left -= will_read;
_buf += will_read;
return will_read;
}
protected:
PGM_P _buf;
size_t _left;
};
#endif //DATASOURCE_H