1
0
mirror of synced 2025-04-21 22:25:55 +03:00

Improved DataSink interface

This commit is contained in:
yhirose 2020-01-05 23:59:54 -05:00
parent c58fca5dba
commit 96e9ec0663
3 changed files with 140 additions and 84 deletions

View File

@ -129,9 +129,9 @@ svr.Get("/stream", [&](const Request &req, Response &res) {
res.set_content_provider( res.set_content_provider(
data->size(), // Content length data->size(), // Content length
[data](uint64_t offset, uint64_t length, DataSink sink) { [data](uint64_t offset, uint64_t length, DataSink &sink) {
const auto &d = *data; const auto &d = *data;
sink(&d[offset], std::min(length, DATA_CHUNK_SIZE)); sink.write(&d[offset], std::min(length, DATA_CHUNK_SIZE));
}, },
[data] { delete data; }); [data] { delete data; });
}); });
@ -169,11 +169,11 @@ svr.Post("/content_receiver",
```cpp ```cpp
svr.Get("/chunked", [&](const Request& req, Response& res) { svr.Get("/chunked", [&](const Request& req, Response& res) {
res.set_chunked_content_provider( res.set_chunked_content_provider(
[](uint64_t offset, DataSink sink, Done done) { [](uint64_t offset, DataSink &sink) {
sink("123", 3); sink.write("123", 3);
sink("345", 3); sink.write("345", 3);
sink("789", 3); sink.write("789", 3);
done(); sink.done();
} }
); );
}); });

141
httplib.h
View File

@ -195,16 +195,6 @@ using Headers = std::multimap<std::string, std::string, detail::ci>;
using Params = std::multimap<std::string, std::string>; using Params = std::multimap<std::string, std::string>;
using Match = std::smatch; using Match = std::smatch;
using DataSink = std::function<void(const char *data, size_t data_len)>;
using Done = std::function<void()>;
using ContentProvider =
std::function<void(size_t offset, size_t length, DataSink sink)>;
using ContentProviderWithCloser =
std::function<void(size_t offset, size_t length, DataSink sink, Done done)>;
using Progress = std::function<bool(uint64_t current, uint64_t total)>; using Progress = std::function<bool(uint64_t current, uint64_t total)>;
struct Response; struct Response;
@ -219,6 +209,20 @@ struct MultipartFormData {
using MultipartFormDataItems = std::vector<MultipartFormData>; using MultipartFormDataItems = std::vector<MultipartFormData>;
using MultipartFormDataMap = std::multimap<std::string, MultipartFormData>; using MultipartFormDataMap = std::multimap<std::string, MultipartFormData>;
class DataSink {
public:
DataSink() = default;
DataSink(const DataSink &) = delete;
DataSink(const DataSink &&) = delete;
std::function<void(const char *data, size_t data_len)> write;
std::function<void()> done;
// TODO: std::function<bool()> is_alive;
};
using ContentProvider =
std::function<void(size_t offset, size_t length, DataSink &sink)>;
using ContentReceiver = using ContentReceiver =
std::function<bool(const char *data, size_t data_length)>; std::function<bool(const char *data, size_t data_length)>;
@ -310,11 +314,12 @@ struct Response {
void set_content_provider( void set_content_provider(
size_t length, size_t length,
std::function<void(size_t offset, size_t length, DataSink sink)> provider, std::function<void(size_t offset, size_t length, DataSink &sink)>
provider,
std::function<void()> resource_releaser = [] {}); std::function<void()> resource_releaser = [] {});
void set_chunked_content_provider( void set_chunked_content_provider(
std::function<void(size_t offset, DataSink sink, Done done)> provider, std::function<void(size_t offset, DataSink &sink)> provider,
std::function<void()> resource_releaser = [] {}); std::function<void()> resource_releaser = [] {});
Response() : status(-1), content_length(0) {} Response() : status(-1), content_length(0) {}
@ -327,7 +332,7 @@ struct Response {
// private members... // private members...
size_t content_length; size_t content_length;
ContentProviderWithCloser content_provider; ContentProvider content_provider;
std::function<void()> content_provider_resource_releaser; std::function<void()> content_provider_resource_releaser;
}; };
@ -1876,47 +1881,69 @@ inline int write_headers(Stream &strm, const T &info, const Headers &headers) {
return write_len; return write_len;
} }
inline ssize_t write_content(Stream &strm, inline ssize_t write_content(Stream &strm, ContentProvider content_provider,
ContentProviderWithCloser content_provider,
size_t offset, size_t length) { size_t offset, size_t length) {
size_t begin_offset = offset; size_t begin_offset = offset;
size_t end_offset = offset + length; size_t end_offset = offset + length;
while (offset < end_offset) { while (offset < end_offset) {
ssize_t written_length = 0; ssize_t written_length = 0;
content_provider(
offset, end_offset - offset, DataSink data_sink;
[&](const char *d, size_t l) { data_sink.write = [&](const char *d, size_t l) {
offset += l; offset += l;
written_length = strm.write(d, l); written_length = strm.write(d, l);
}, };
[&](void) { written_length = -1; }); data_sink.done = [&](void) { written_length = -1; };
content_provider(offset, end_offset - offset,
// [&](const char *d, size_t l) {
// offset += l;
// written_length = strm.write(d, l);
// },
// [&](void) { written_length = -1; }
data_sink);
if (written_length < 0) { return written_length; } if (written_length < 0) { return written_length; }
} }
return static_cast<ssize_t>(offset - begin_offset); return static_cast<ssize_t>(offset - begin_offset);
} }
inline ssize_t inline ssize_t write_content_chunked(Stream &strm,
write_content_chunked(Stream &strm, ContentProvider content_provider) {
ContentProviderWithCloser content_provider) {
size_t offset = 0; size_t offset = 0;
auto data_available = true; auto data_available = true;
ssize_t total_written_length = 0; ssize_t total_written_length = 0;
while (data_available) { while (data_available) {
ssize_t written_length = 0; ssize_t written_length = 0;
DataSink data_sink;
data_sink.write = [&](const char *d, size_t l) {
data_available = l > 0;
offset += l;
// Emit chunked response header and footer for each chunk
auto chunk = from_i_to_hex(l) + "\r\n" + std::string(d, l) + "\r\n";
written_length = strm.write(chunk);
};
data_sink.done = [&](void) {
data_available = false;
written_length = strm.write("0\r\n\r\n");
};
content_provider( content_provider(
offset, 0, offset, 0,
[&](const char *d, size_t l) { // [&](const char *d, size_t l) {
data_available = l > 0; // data_available = l > 0;
offset += l; // offset += l;
//
// Emit chunked response header and footer for each chunk // // Emit chunked response header and footer for each chunk
auto chunk = from_i_to_hex(l) + "\r\n" + std::string(d, l) + "\r\n"; // auto chunk = from_i_to_hex(l) + "\r\n" + std::string(d, l) +
written_length = strm.write(chunk); // "\r\n"; written_length = strm.write(chunk);
}, // },
[&](void) { // [&](void) {
data_available = false; // data_available = false;
written_length = strm.write("0\r\n\r\n"); // written_length = strm.write("0\r\n\r\n");
}); // }
data_sink);
if (written_length < 0) { return written_length; } if (written_length < 0) { return written_length; }
total_written_length += written_length; total_written_length += written_length;
@ -2652,21 +2679,23 @@ inline void Response::set_content(const std::string &s,
inline void Response::set_content_provider( inline void Response::set_content_provider(
size_t length, size_t length,
std::function<void(size_t offset, size_t length, DataSink sink)> provider, std::function<void(size_t offset, size_t length, DataSink &sink)> provider,
std::function<void()> resource_releaser) { std::function<void()> resource_releaser) {
assert(length > 0); assert(length > 0);
content_length = length; content_length = length;
content_provider = [provider](size_t offset, size_t length, DataSink sink, content_provider = [provider](size_t offset, size_t length, DataSink &sink) {
Done) { provider(offset, length, sink); }; provider(offset, length, sink);
};
content_provider_resource_releaser = resource_releaser; content_provider_resource_releaser = resource_releaser;
} }
inline void Response::set_chunked_content_provider( inline void Response::set_chunked_content_provider(
std::function<void(size_t offset, DataSink sink, Done done)> provider, std::function<void(size_t offset, DataSink &sink)> provider,
std::function<void()> resource_releaser) { std::function<void()> resource_releaser) {
content_length = 0; content_length = 0;
content_provider = [provider](size_t offset, size_t, DataSink sink, content_provider = [provider](size_t offset, size_t, DataSink &sink) {
Done done) { provider(offset, sink, done); }; provider(offset, sink);
};
content_provider_resource_releaser = resource_releaser; content_provider_resource_releaser = resource_releaser;
} }
@ -3731,12 +3760,15 @@ inline bool Client::write_request(Stream &strm, const Request &req,
if (req.content_provider) { if (req.content_provider) {
size_t offset = 0; size_t offset = 0;
size_t end_offset = req.content_length; size_t end_offset = req.content_length;
DataSink data_sink;
data_sink.write = [&](const char *d, size_t l) {
auto written_length = strm.write(d, l);
offset += written_length;
};
while (offset < end_offset) { while (offset < end_offset) {
req.content_provider(offset, end_offset - offset, req.content_provider(offset, end_offset - offset, data_sink);
[&](const char *d, size_t l) {
auto written_length = strm.write(d, l);
offset += written_length;
});
} }
} }
} else { } else {
@ -3761,12 +3793,15 @@ inline std::shared_ptr<Response> Client::send_with_content_provider(
if (compress_) { if (compress_) {
if (content_provider) { if (content_provider) {
size_t offset = 0; size_t offset = 0;
DataSink data_sink;
data_sink.write = [&](const char *data, size_t data_len) {
req.body.append(data, data_len);
offset += data_len;
};
while (offset < content_length) { while (offset < content_length) {
content_provider(offset, content_length - offset, content_provider(offset, content_length - offset, data_sink);
[&](const char *data, size_t data_len) {
req.body.append(data, data_len);
offset += data_len;
});
} }
} else { } else {
req.body = body; req.body = body;

View File

@ -697,18 +697,33 @@ protected:
.Get("/streamed-chunked", .Get("/streamed-chunked",
[&](const Request & /*req*/, Response &res) { [&](const Request & /*req*/, Response &res) {
res.set_chunked_content_provider( res.set_chunked_content_provider(
[](uint64_t /*offset*/, DataSink sink, Done done) { [](uint64_t /*offset*/, DataSink &sink) {
sink("123", 3); sink.write("123", 3);
sink("456", 3); sink.write("456", 3);
sink("789", 3); sink.write("789", 3);
done(); sink.done();
}); });
}) })
.Get("/streamed-chunked2",
[&](const Request & /*req*/, Response &res) {
auto i = new int(0);
res.set_chunked_content_provider(
[i](uint64_t /*offset*/, DataSink &sink) {
switch (*i) {
case 0: sink.write("123", 3); break;
case 1: sink.write("456", 3); break;
case 2: sink.write("789", 3); break;
case 3: sink.done(); break;
}
(*i)++;
},
[i] { delete i; });
})
.Get("/streamed", .Get("/streamed",
[&](const Request & /*req*/, Response &res) { [&](const Request & /*req*/, Response &res) {
res.set_content_provider( res.set_content_provider(
6, [](uint64_t offset, uint64_t /*length*/, DataSink sink) { 6, [](uint64_t offset, uint64_t /*length*/, DataSink &sink) {
sink(offset < 3 ? "a" : "b", 1); sink.write(offset < 3 ? "a" : "b", 1);
}); });
}) })
.Get("/streamed-with-range", .Get("/streamed-with-range",
@ -716,23 +731,23 @@ protected:
auto data = new std::string("abcdefg"); auto data = new std::string("abcdefg");
res.set_content_provider( res.set_content_provider(
data->size(), data->size(),
[data](uint64_t offset, uint64_t length, DataSink sink) { [data](uint64_t offset, uint64_t length, DataSink &sink) {
size_t DATA_CHUNK_SIZE = 4; size_t DATA_CHUNK_SIZE = 4;
const auto &d = *data; const auto &d = *data;
auto out_len = auto out_len =
std::min(static_cast<size_t>(length), DATA_CHUNK_SIZE); std::min(static_cast<size_t>(length), DATA_CHUNK_SIZE);
sink(&d[static_cast<size_t>(offset)], out_len); sink.write(&d[static_cast<size_t>(offset)], out_len);
}, },
[data] { delete data; }); [data] { delete data; });
}) })
.Get("/streamed-cancel", .Get("/streamed-cancel",
[&](const Request & /*req*/, Response &res) { [&](const Request & /*req*/, Response &res) {
res.set_content_provider( res.set_content_provider(size_t(-1), [](uint64_t /*offset*/,
size_t(-1), uint64_t /*length*/,
[](uint64_t /*offset*/, uint64_t /*length*/, DataSink sink) { DataSink &sink) {
std::string data = "data_chunk"; std::string data = "data_chunk";
sink(data.data(), data.size()); sink.write(data.data(), data.size());
}); });
}) })
.Get("/with-range", .Get("/with-range",
[&](const Request & /*req*/, Response &res) { [&](const Request & /*req*/, Response &res) {
@ -1508,6 +1523,13 @@ TEST_F(ServerTest, GetStreamedChunked) {
EXPECT_EQ(std::string("123456789"), res->body); EXPECT_EQ(std::string("123456789"), res->body);
} }
TEST_F(ServerTest, GetStreamedChunked2) {
auto res = cli_.Get("/streamed-chunked2");
ASSERT_TRUE(res != nullptr);
EXPECT_EQ(200, res->status);
EXPECT_EQ(std::string("123456789"), res->body);
}
TEST_F(ServerTest, LargeChunkedPost) { TEST_F(ServerTest, LargeChunkedPost) {
Request req; Request req;
req.method = "POST"; req.method = "POST";
@ -1567,8 +1589,8 @@ TEST_F(ServerTest, Put) {
TEST_F(ServerTest, PutWithContentProvider) { TEST_F(ServerTest, PutWithContentProvider) {
auto res = cli_.Put( auto res = cli_.Put(
"/put", 3, "/put", 3,
[](size_t /*offset*/, size_t /*length*/, DataSink sink) { [](size_t /*offset*/, size_t /*length*/, DataSink &sink) {
sink("PUT", 3); sink.write("PUT", 3);
}, },
"text/plain"); "text/plain");
@ -1582,8 +1604,8 @@ TEST_F(ServerTest, PutWithContentProviderWithGzip) {
cli_.set_compress(true); cli_.set_compress(true);
auto res = cli_.Put( auto res = cli_.Put(
"/put", 3, "/put", 3,
[](size_t /*offset*/, size_t /*length*/, DataSink sink) { [](size_t /*offset*/, size_t /*length*/, DataSink &sink) {
sink("PUT", 3); sink.write("PUT", 3);
}, },
"text/plain"); "text/plain");
@ -1689,7 +1711,8 @@ TEST_F(ServerTest, PatchContentReceiver) {
} }
TEST_F(ServerTest, PostQueryStringAndBody) { TEST_F(ServerTest, PostQueryStringAndBody) {
auto res = cli_.Post("/query-string-and-body?key=value", "content", "text/plain"); auto res =
cli_.Post("/query-string-and-body?key=value", "content", "text/plain");
ASSERT_TRUE(res != nullptr); ASSERT_TRUE(res != nullptr);
ASSERT_EQ(200, res->status); ASSERT_EQ(200, res->status);
} }
@ -2139,8 +2162,7 @@ TEST(SSLClientServerTest, ClientCertPresent) {
thread t = thread([&]() { ASSERT_TRUE(svr.listen(HOST, PORT)); }); thread t = thread([&]() { ASSERT_TRUE(svr.listen(HOST, PORT)); });
msleep(1); msleep(1);
httplib::SSLClient cli(HOST, PORT, CLIENT_CERT_FILE, httplib::SSLClient cli(HOST, PORT, CLIENT_CERT_FILE, CLIENT_PRIVATE_KEY_FILE);
CLIENT_PRIVATE_KEY_FILE);
auto res = cli.Get("/test"); auto res = cli.Get("/test");
cli.set_timeout_sec(30); cli.set_timeout_sec(30);
ASSERT_TRUE(res != nullptr); ASSERT_TRUE(res != nullptr);
@ -2181,8 +2203,7 @@ TEST(SSLClientServerTest, TrustDirOptional) {
thread t = thread([&]() { ASSERT_TRUE(svr.listen(HOST, PORT)); }); thread t = thread([&]() { ASSERT_TRUE(svr.listen(HOST, PORT)); });
msleep(1); msleep(1);
httplib::SSLClient cli(HOST, PORT, CLIENT_CERT_FILE, httplib::SSLClient cli(HOST, PORT, CLIENT_CERT_FILE, CLIENT_PRIVATE_KEY_FILE);
CLIENT_PRIVATE_KEY_FILE);
auto res = cli.Get("/test"); auto res = cli.Get("/test");
cli.set_timeout_sec(30); cli.set_timeout_sec(30);
ASSERT_TRUE(res != nullptr); ASSERT_TRUE(res != nullptr);