mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-02 13:06:47 +03:00
Summary: This migrates the quic code to use quic::Expected instead of folly::Expected. quic::Expected is a vendored wrapper for expected-lite, which itself matches std::expected. std::expected is not available to us, but once it is, we would be able to further simplify to the std version. This migration is almost entirely mechanical. --- > Generated by [Confucius Code Assist (CCA)](https://www.internalfb.com/wiki/Confucius/Analect/Shared_Analects/Confucius_Code_Assist_(CCA)/) [Session](https://www.internalfb.com/confucius?session_id=7044a18e-4d22-11f0-afeb-97de80927172&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=7044a18e-4d22-11f0-afeb-97de80927172&tab=Trace) --- > Generated by [RACER](https://www.internalfb.com/wiki/RACER_(Risk-Aware_Code_Editing_and_Refactoring)/), powered by [Confucius](https://www.internalfb.com/wiki/Confucius/Analect/Shared_Analects/Confucius_Code_Assist_(CCA)/) [Session](https://www.internalfb.com/confucius?session_id=1fea6620-4d30-11f0-a206-ad0241db9ec9&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=1fea6620-4d30-11f0-a206-ad0241db9ec9&tab=Trace) [Session](https://www.internalfb.com/confucius?session_id=2bdbabba-505a-11f0-a21b-fb3d40195e00&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=2bdbabba-505a-11f0-a21b-fb3d40195e00&tab=Trace) [Session](https://www.internalfb.com/confucius?session_id=eb689fd2-5114-11f0-ade8-99c0fe2f80f2&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=eb689fd2-5114-11f0-ade8-99c0fe2f80f2&tab=Trace) [Session](https://www.internalfb.com/confucius?session_id=9bc2dcec-51f8-11f0-8604-7bc1f5225a86&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=9bc2dcec-51f8-11f0-8604-7bc1f5225a86&tab=Trace) [Session](https://www.internalfb.com/confucius?session_id=46b187ea-5cdd-11f0-9bab-7b6b886e8a09&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=46b187ea-5cdd-11f0-9bab-7b6b886e8a09&tab=Trace) Reviewed By: kvtsoy Differential Revision: D76488955 fbshipit-source-id: 92b9cbeac85a28722a6180464b47d84696b1e81b
1473 lines
49 KiB
C++
1473 lines
49 KiB
C++
/*
|
|
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* LICENSE file in the root directory of this source tree.
|
|
*/
|
|
|
|
#include <quic/api/QuicBatchWriter.h>
|
|
#include <quic/api/QuicBatchWriterFactory.h>
|
|
#include <quic/common/events/FollyQuicEventBase.h>
|
|
#include <quic/common/test/TestUtils.h>
|
|
#include <quic/common/udpsocket/FollyQuicAsyncUDPSocket.h>
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <quic/common/testutil/MockAsyncUDPSocket.h>
|
|
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
|
|
#include <quic/server/state/ServerStateMachine.h>
|
|
|
|
using namespace testing;
|
|
|
|
namespace quic::testing {
|
|
|
|
constexpr const auto kStrLen = 10;
|
|
constexpr const auto kStrLenGT = 20;
|
|
constexpr const auto kStrLenLT = 5;
|
|
constexpr const auto kBatchNum = 3;
|
|
constexpr const auto kNumLoops = 10;
|
|
|
|
struct QuicBatchWriterTest : public ::testing::Test {
|
|
QuicBatchWriterTest()
|
|
: conn_(FizzServerQuicHandshakeContext::Builder().build()) {}
|
|
|
|
protected:
|
|
QuicServerConnectionState conn_;
|
|
bool gsoSupported_{false};
|
|
};
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingNone) {
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_NONE,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest('A', kStrLen);
|
|
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
auto buf = folly::IOBuf::copyBuffer(strTest.c_str(), kStrLen);
|
|
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
CHECK_EQ(batchWriter->size(), kStrLen);
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingGSOBase) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
FollyQuicAsyncUDPSocket sock(qEvb);
|
|
auto ret = sock.setReuseAddr(false);
|
|
ASSERT_FALSE(ret.hasError());
|
|
ASSERT_FALSE(sock.bind(folly::SocketAddress("127.0.0.1", 0)).hasError());
|
|
auto gsoResult = sock.getGSO();
|
|
ASSERT_FALSE(gsoResult.hasError());
|
|
gsoSupported_ = gsoResult.value();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
1,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest(kStrLen, 'A');
|
|
// if GSO is not available, just test we've got a regular
|
|
// batch writer
|
|
if (!gsoSupported_) {
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), strTest.size(), folly::SocketAddress(), nullptr));
|
|
EXPECT_FALSE(batchWriter->needsFlush(kStrLenLT));
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingGSOLastSmallPacket) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
FollyQuicAsyncUDPSocket sock(qEvb);
|
|
auto ret = sock.setReuseAddr(false);
|
|
ASSERT_FALSE(ret.hasError());
|
|
ASSERT_FALSE(sock.bind(folly::SocketAddress("127.0.0.1", 0)).hasError());
|
|
auto gsoResult = sock.getGSO();
|
|
ASSERT_FALSE(gsoResult.hasError());
|
|
gsoSupported_ = gsoResult.value();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
1,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest;
|
|
// only if GSO is available
|
|
if (gsoSupported_) {
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
// batch kStrLen, kStrLenLT
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
strTest = std::string(kStrLen, 'A');
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
EXPECT_FALSE(batchWriter->needsFlush(kStrLen));
|
|
EXPECT_FALSE(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
CHECK_EQ(batchWriter->size(), kStrLen);
|
|
strTest = std::string(kStrLenLT, 'A');
|
|
buf = folly::IOBuf::copyBuffer(strTest);
|
|
EXPECT_FALSE(batchWriter->needsFlush(kStrLenLT));
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), kStrLenLT, folly::SocketAddress(), nullptr));
|
|
CHECK_EQ(batchWriter->size(), kStrLen + kStrLenLT);
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingGSOLastBigPacket) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
FollyQuicAsyncUDPSocket sock(qEvb);
|
|
auto ret = sock.setReuseAddr(false);
|
|
ASSERT_FALSE(ret.hasError());
|
|
ASSERT_FALSE(sock.bind(folly::SocketAddress("127.0.0.1", 0)).hasError());
|
|
auto gsoResult = sock.getGSO();
|
|
ASSERT_FALSE(gsoResult.hasError());
|
|
gsoSupported_ = gsoResult.value();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
1,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest;
|
|
// only if GSO is available
|
|
if (gsoSupported_) {
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
// try to batch kStrLen, kStrLenGT
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
strTest = std::string(kStrLen, 'A');
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
EXPECT_FALSE(batchWriter->needsFlush(kStrLen));
|
|
EXPECT_FALSE(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
CHECK_EQ(batchWriter->size(), kStrLen);
|
|
CHECK(batchWriter->needsFlush(kStrLenGT));
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingGSOBatchNum) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
FollyQuicAsyncUDPSocket sock(qEvb);
|
|
auto ret = sock.setReuseAddr(false);
|
|
ASSERT_FALSE(ret.hasError());
|
|
ASSERT_FALSE(sock.bind(folly::SocketAddress("127.0.0.1", 0)).hasError());
|
|
auto gsoResult = sock.getGSO();
|
|
ASSERT_FALSE(gsoResult.hasError());
|
|
gsoSupported_ = gsoResult.value();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest(kStrLen, 'A');
|
|
// if GSO is not available, just test we've got a regular
|
|
// batch writer
|
|
if (gsoSupported_) {
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
// try to batch up to kBatchNum
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (auto j = 0; j < kBatchNum - 1; j++) {
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
// add the kBatchNum buf
|
|
auto buf = folly::IOBuf::copyBuffer(strTest.c_str(), kStrLen);
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsg) {
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest(kStrLen, 'A');
|
|
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
// try to batch up to kBatchNum
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (auto j = 0; j < kBatchNum - 1; j++) {
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
// add the kBatchNum buf
|
|
auto buf = folly::IOBuf::copyBuffer(strTest.c_str(), kStrLen);
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgInplaceIovecMatches) {
|
|
// In this test case, we don't surpass the kNumIovecBufferChains limit
|
|
// (i.e. the number of contiguous buffers we are sending)
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
|
|
std::vector<std::string> messages{"It", "is", "sunny!"};
|
|
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (auto& message : messages) {
|
|
auto buf = folly::IOBuf::copyBuffer(
|
|
ByteRange((unsigned char*)message.data(), message.size()));
|
|
batchWriter->append(
|
|
std::move(buf), message.size(), folly::SocketAddress(), nullptr);
|
|
size += message.size();
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
EXPECT_CALL(sock, writem(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](folly::Range<folly::SocketAddress const*> addrs,
|
|
iovec* iovecs,
|
|
size_t* messageSizes,
|
|
size_t count) {
|
|
EXPECT_EQ(addrs.size(), 1);
|
|
EXPECT_EQ(count, messages.size());
|
|
|
|
size_t currentIovIndex = 0;
|
|
for (size_t i = 0; i < count; i++) {
|
|
auto wrappedIovBuffer =
|
|
folly::IOBuf::wrapIov(iovecs + currentIovIndex, messageSizes[i]);
|
|
currentIovIndex += messageSizes[i];
|
|
|
|
folly::IOBufEqualTo eq;
|
|
EXPECT_TRUE(
|
|
eq(wrappedIovBuffer,
|
|
folly::IOBuf::copyBuffer(ByteRange(
|
|
(unsigned char*)messages[i].data(), messages[i].size()))));
|
|
}
|
|
|
|
return 0;
|
|
}));
|
|
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgNewlyAllocatedIovecMatches) {
|
|
// In this test case, we surpass the kNumIovecBufferChains limit
|
|
// (i.e. the number of contiguous buffers we are sending)
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
|
|
std::vector<std::vector<std::string>> messages{
|
|
{"It", "is", "sunny!"},
|
|
{"but", "it", "is", "so", "cold"},
|
|
{"my",
|
|
"jacket",
|
|
"isn't",
|
|
"warm",
|
|
"enough",
|
|
"and",
|
|
"my",
|
|
"hands",
|
|
"are",
|
|
"freezing"}};
|
|
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
|
|
std::vector<BufPtr> buffers;
|
|
|
|
size_t size = 0;
|
|
for (auto& message : messages) {
|
|
auto buf = std::make_unique<folly::IOBuf>();
|
|
for (size_t j = 0; j < message.size(); j++) {
|
|
auto partBuf = folly::IOBuf::copyBuffer(
|
|
ByteRange((unsigned char*)message[j].data(), message[j].size()));
|
|
buf->appendToChain(std::move(partBuf));
|
|
}
|
|
buffers.emplace_back(std::move(buf));
|
|
}
|
|
|
|
for (size_t i = 0; i < messages.size(); i++) {
|
|
batchWriter->append(
|
|
buffers[i]->clone(),
|
|
buffers[i]->computeChainDataLength(),
|
|
folly::SocketAddress(),
|
|
nullptr);
|
|
size += buffers[i]->computeChainDataLength();
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
EXPECT_CALL(sock, writem(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](folly::Range<folly::SocketAddress const*> addrs,
|
|
iovec* iovecs,
|
|
size_t* messageSizes,
|
|
size_t count) {
|
|
EXPECT_EQ(addrs.size(), 1);
|
|
EXPECT_EQ(count, messages.size());
|
|
|
|
size_t currentIovIndex = 0;
|
|
for (size_t i = 0; i < count; i++) {
|
|
auto wrappedIovBuffer =
|
|
folly::IOBuf::wrapIov(iovecs + currentIovIndex, messageSizes[i]);
|
|
currentIovIndex += messageSizes[i];
|
|
|
|
folly::IOBufEqualTo eq;
|
|
EXPECT_TRUE(eq(wrappedIovBuffer, buffers[i]));
|
|
}
|
|
|
|
return 0;
|
|
}));
|
|
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgInplace) {
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * kBatchNum);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
gsoSupported_ = false;
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
std::vector<iovec> expectedIovecs;
|
|
|
|
// try to batch up to kBatchNum
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (auto j = 0; j < kBatchNum - 1; j++) {
|
|
iovec vec{};
|
|
vec.iov_base = (void*)bufAccessor->buf()->tail();
|
|
vec.iov_len = kStrLen;
|
|
bufAccessor->buf()->append(kStrLen);
|
|
expectedIovecs.push_back(vec);
|
|
|
|
EXPECT_FALSE(batchWriter->append(
|
|
nullptr, kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
// add the kBatchNum buf
|
|
iovec vec{};
|
|
vec.iov_base = (void*)bufAccessor->buf()->tail();
|
|
vec.iov_len = kStrLen;
|
|
bufAccessor->buf()->append(kStrLen);
|
|
expectedIovecs.push_back(vec);
|
|
|
|
CHECK(
|
|
batchWriter->append(nullptr, kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
|
|
EXPECT_CALL(sock, writem(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](folly::Range<folly::SocketAddress const*> addrs,
|
|
iovec* iovecs,
|
|
size_t* messageSizes,
|
|
size_t count) {
|
|
EXPECT_EQ(addrs.size(), 1);
|
|
EXPECT_EQ(count, kBatchNum);
|
|
|
|
for (size_t k = 0; k < count; k++) {
|
|
EXPECT_EQ(messageSizes[k], 1);
|
|
EXPECT_EQ(expectedIovecs[k].iov_base, iovecs[k].iov_base);
|
|
EXPECT_EQ(expectedIovecs[k].iov_len, iovecs[k].iov_len);
|
|
}
|
|
|
|
return 0;
|
|
}));
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
expectedIovecs.clear();
|
|
EXPECT_TRUE(bufAccessor->buf()->empty());
|
|
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatchNum) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
FollyQuicAsyncUDPSocket sock(qEvb);
|
|
auto ret = sock.setReuseAddr(false);
|
|
ASSERT_FALSE(ret.hasError());
|
|
ASSERT_FALSE(sock.bind(folly::SocketAddress("127.0.0.1", 0)).hasError());
|
|
auto gsoResult = sock.getGSO();
|
|
ASSERT_FALSE(gsoResult.hasError());
|
|
gsoSupported_ = gsoResult.value();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest(kStrLen, 'A');
|
|
// if GSO is not available, just test we've got a regular
|
|
// batch writer
|
|
if (gsoSupported_) {
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
// try to batch up to kBatchNum
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (auto j = 0; j < kBatchNum - 1; j++) {
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
// add the kBatchNum buf
|
|
auto buf = folly::IOBuf::copyBuffer(strTest.c_str(), kStrLen);
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), kStrLen, folly::SocketAddress(), nullptr));
|
|
size += kStrLen;
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatcBigSmallPacket) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
FollyQuicAsyncUDPSocket sock(qEvb);
|
|
auto ret = sock.setReuseAddr(false);
|
|
ASSERT_FALSE(ret.hasError());
|
|
ASSERT_FALSE(sock.bind(folly::SocketAddress("127.0.0.1", 0)).hasError());
|
|
auto gsoResult = sock.getGSO();
|
|
ASSERT_FALSE(gsoResult.hasError());
|
|
gsoSupported_ = gsoResult.value();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
3 * kBatchNum,
|
|
false, /* enable backpressure */
|
|
DataPathType::ChainedMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
std::string strTest(kStrLen, 'A');
|
|
// if GSO is not available, just test we've got a regular
|
|
// batch writer
|
|
if (gsoSupported_) {
|
|
// run multiple loops
|
|
for (size_t i = 0; i < kNumLoops; i++) {
|
|
// try to batch up to kBatchNum
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (auto j = 0; j < 3 * kBatchNum - 1; j++) {
|
|
strTest = (j % 3 == 0) ? std::string(kStrLen, 'A')
|
|
: ((j % 3 == 1) ? std::string(kStrLenLT, 'A')
|
|
: std::string(kStrLenGT, 'A'));
|
|
auto buf = folly::IOBuf::copyBuffer(strTest);
|
|
// we can add various sizes without the need to flush until we add
|
|
// the maxBufs buffer
|
|
EXPECT_FALSE(batchWriter->append(
|
|
std::move(buf), strTest.length(), folly::SocketAddress(), nullptr));
|
|
size += strTest.length();
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
}
|
|
|
|
// add the kBatchNum buf
|
|
auto buf = folly::IOBuf::copyBuffer(strTest.c_str(), kStrLen);
|
|
CHECK(batchWriter->append(
|
|
std::move(buf), strTest.length(), folly::SocketAddress(), nullptr));
|
|
size += strTest.length();
|
|
CHECK_EQ(batchWriter->size(), size);
|
|
batchWriter->reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test the case where we send 5 packets, all of the same size, to the
|
|
// same address.
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOInplaceSameSizeAll) {
|
|
gsoSupported_ = true;
|
|
size_t batchSize = 5;
|
|
size_t packetSize = 100;
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (size_t j = 0; j < batchSize - 1; j++) {
|
|
bufAccessor->append(packetSize);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
nullptr, packetSize, folly::SocketAddress(), nullptr));
|
|
size += packetSize;
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
}
|
|
bufAccessor->append(packetSize);
|
|
EXPECT_TRUE(batchWriter->append(
|
|
nullptr, packetSize, folly::SocketAddress(), nullptr));
|
|
size += packetSize;
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
|
|
EXPECT_CALL(sock, writeGSO(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](const folly::SocketAddress&,
|
|
const struct iovec* iovecs,
|
|
size_t iovec_len,
|
|
QuicAsyncUDPSocket::WriteOptions writeOptions) {
|
|
EXPECT_EQ(iovec_len, 5);
|
|
EXPECT_EQ(writeOptions.gso, packetSize);
|
|
|
|
for (uint32_t i = 0; i < 5; i++) {
|
|
EXPECT_EQ(
|
|
iovecs[i].iov_base,
|
|
(uint8_t*)bufAccessor->buf()->buffer() + packetSize * i);
|
|
EXPECT_EQ(iovecs[i].iov_len, packetSize);
|
|
}
|
|
|
|
return 1;
|
|
}));
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
EXPECT_TRUE(bufAccessor->buf()->empty());
|
|
}
|
|
|
|
// Test the case where we do the following for the same address, in order:
|
|
// (1) Send 3 packets of the same size
|
|
// (2) Send 1 packet that's smaller than the previous 3
|
|
// (3) Send 1 packet of the same size as the 3 initial ones
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOInplaceSmallerSizeInMiddle) {
|
|
gsoSupported_ = true;
|
|
size_t batchSize = 5;
|
|
|
|
std::vector<size_t> packetSizes = {100, 100, 100, 70, 100};
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (size_t j = 0; j < batchSize - 1; j++) {
|
|
bufAccessor->append(packetSizes[j]);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
nullptr, packetSizes[j], folly::SocketAddress(), nullptr));
|
|
size += packetSizes[j];
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
}
|
|
bufAccessor->append(packetSizes[batchSize - 1]);
|
|
EXPECT_TRUE(batchWriter->append(
|
|
nullptr, packetSizes[batchSize - 1], folly::SocketAddress(), nullptr));
|
|
size += packetSizes[batchSize - 1];
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
|
|
EXPECT_CALL(sock, writemGSO(_, _, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(
|
|
Invoke([&](folly::Range<folly::SocketAddress const*> /* addrs */,
|
|
iovec* iov,
|
|
size_t* numIovecsInBuffer,
|
|
size_t count,
|
|
const QuicAsyncUDPSocket::WriteOptions* options) {
|
|
EXPECT_EQ(count, 2);
|
|
EXPECT_EQ(numIovecsInBuffer[0], 4);
|
|
EXPECT_EQ(numIovecsInBuffer[1], 1);
|
|
EXPECT_EQ(options[0].gso, 100);
|
|
// There's just one packet in the second series, so we don't use GSO
|
|
// there.
|
|
EXPECT_EQ(options[1].gso, 0);
|
|
|
|
auto* currBufferAddr = (uint8_t*)bufAccessor->buf()->buffer();
|
|
for (uint32_t i = 0; i < batchSize; i++) {
|
|
EXPECT_EQ(iov[i].iov_base, currBufferAddr);
|
|
EXPECT_EQ(iov[i].iov_len, packetSizes[i]);
|
|
|
|
currBufferAddr += packetSizes[i];
|
|
}
|
|
|
|
return 2;
|
|
}));
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
EXPECT_TRUE(bufAccessor->buf()->empty());
|
|
}
|
|
|
|
// Test the case where we do the following for the same address, in order:
|
|
// (1) Send 3 packets of the same size
|
|
// (2) Send 1 packet that's larger than the previous 3
|
|
// (3) Send 1 packet of the same size as the 3 initial ones
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOInplaceLargerSizeInMiddle) {
|
|
gsoSupported_ = true;
|
|
size_t batchSize = 5;
|
|
|
|
std::vector<size_t> packetSizes = {100, 100, 100, 120, 100};
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (size_t j = 0; j < batchSize - 1; j++) {
|
|
bufAccessor->append(packetSizes[j]);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
nullptr, packetSizes[j], folly::SocketAddress(), nullptr));
|
|
size += packetSizes[j];
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
}
|
|
bufAccessor->append(packetSizes[batchSize - 1]);
|
|
EXPECT_TRUE(batchWriter->append(
|
|
nullptr, packetSizes[batchSize - 1], folly::SocketAddress(), nullptr));
|
|
size += packetSizes[batchSize - 1];
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
|
|
EXPECT_CALL(sock, writemGSO(_, _, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(
|
|
Invoke([&](folly::Range<folly::SocketAddress const*> /* addrs */,
|
|
iovec* iov,
|
|
size_t* numIovecsInBuffer,
|
|
size_t count,
|
|
const QuicAsyncUDPSocket::WriteOptions* options) {
|
|
EXPECT_EQ(count, 2);
|
|
EXPECT_EQ(numIovecsInBuffer[0], 3);
|
|
EXPECT_EQ(numIovecsInBuffer[1], 2);
|
|
EXPECT_EQ(options[0].gso, 100);
|
|
EXPECT_EQ(options[1].gso, 120);
|
|
|
|
auto* currBufferAddr = (uint8_t*)bufAccessor->buf()->buffer();
|
|
for (uint32_t i = 0; i < batchSize; i++) {
|
|
EXPECT_EQ(iov[i].iov_base, currBufferAddr);
|
|
EXPECT_EQ(iov[i].iov_len, packetSizes[i]);
|
|
|
|
currBufferAddr += packetSizes[i];
|
|
}
|
|
|
|
return 2;
|
|
}));
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
EXPECT_TRUE(bufAccessor->buf()->empty());
|
|
}
|
|
|
|
// Send 5 packets of the same length.
|
|
// Packets 1, 2, and 5 are to address A.
|
|
// Packets 3 and 4 are to address B.
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOInplaceDifferentAddrs) {
|
|
folly::SocketAddress addrA("127.0.0.1", 80);
|
|
folly::SocketAddress addrB("127.0.0.1", 443);
|
|
|
|
std::vector<folly::SocketAddress> addrs = {addrA, addrA, addrB, addrB, addrA};
|
|
|
|
gsoSupported_ = true;
|
|
size_t batchSize = 5;
|
|
size_t packetSize = 100;
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (size_t j = 0; j < batchSize - 1; j++) {
|
|
bufAccessor->append(packetSize);
|
|
EXPECT_FALSE(batchWriter->append(nullptr, packetSize, addrs[j], nullptr));
|
|
size += packetSize;
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
}
|
|
bufAccessor->append(packetSize);
|
|
EXPECT_TRUE(
|
|
batchWriter->append(nullptr, packetSize, addrs[batchSize - 1], nullptr));
|
|
size += packetSize;
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
|
|
EXPECT_CALL(sock, writemGSO(_, _, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](folly::Range<folly::SocketAddress const*> addrs,
|
|
iovec* iov,
|
|
size_t* numIovecsInBuffer,
|
|
size_t count,
|
|
const QuicAsyncUDPSocket::WriteOptions* options) {
|
|
EXPECT_EQ(count, 2);
|
|
EXPECT_EQ(numIovecsInBuffer[0], 3);
|
|
EXPECT_EQ(numIovecsInBuffer[1], 2);
|
|
|
|
EXPECT_EQ(options[0].gso, packetSize);
|
|
EXPECT_EQ(options[1].gso, packetSize);
|
|
|
|
// All packets are of size packetSize
|
|
for (uint32_t i = 0; i < batchSize; i++) {
|
|
EXPECT_EQ(iov[i].iov_len, packetSize);
|
|
}
|
|
|
|
// If the shared buffer looks like this:
|
|
// [slot1, slot2, slot3, slot4, slot5]
|
|
// Then iov should look like
|
|
// [slot1, slot2, slot5, slot3, slot4]
|
|
auto* bufferStart = (uint8_t*)bufAccessor->buf()->buffer();
|
|
std::vector<uint8_t*> expectedBufferStartPositions = {
|
|
bufferStart,
|
|
bufferStart + packetSize,
|
|
bufferStart + 4 * packetSize,
|
|
bufferStart + 2 * packetSize,
|
|
bufferStart + 3 * packetSize};
|
|
for (uint32_t i = 0; i < batchSize; i++) {
|
|
EXPECT_EQ(iov[i].iov_base, expectedBufferStartPositions[i]);
|
|
}
|
|
|
|
EXPECT_EQ(addrs[0], addrA);
|
|
EXPECT_EQ(addrs[1], addrB);
|
|
|
|
return 2;
|
|
}));
|
|
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
EXPECT_TRUE(bufAccessor->buf()->empty());
|
|
}
|
|
|
|
// Test the case where we send 5 packets, all of the same size, to the
|
|
// same address, with an external writer writing data to the shared buffer right
|
|
// before we write the batch.
|
|
TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOInplaceExternalDataWritten) {
|
|
gsoSupported_ = true;
|
|
size_t batchSize = 5;
|
|
size_t packetSize = 100;
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
CHECK(batchWriter->empty());
|
|
CHECK_EQ(batchWriter->size(), 0);
|
|
size_t size = 0;
|
|
for (size_t j = 0; j < batchSize - 1; j++) {
|
|
bufAccessor->append(packetSize);
|
|
EXPECT_FALSE(batchWriter->append(
|
|
nullptr, packetSize, folly::SocketAddress(), nullptr));
|
|
size += packetSize;
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
}
|
|
bufAccessor->append(packetSize);
|
|
EXPECT_TRUE(batchWriter->append(
|
|
nullptr, packetSize, folly::SocketAddress(), nullptr));
|
|
size += packetSize;
|
|
EXPECT_EQ(batchWriter->size(), size);
|
|
|
|
EXPECT_CALL(sock, writeGSO(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](const folly::SocketAddress&,
|
|
const struct iovec* iovecs,
|
|
size_t iovec_len,
|
|
QuicAsyncUDPSocket::WriteOptions writeOptions) {
|
|
EXPECT_EQ(iovec_len, 5);
|
|
EXPECT_EQ(writeOptions.gso, packetSize);
|
|
|
|
for (uint32_t i = 0; i < 5; i++) {
|
|
EXPECT_EQ(
|
|
iovecs[i].iov_base,
|
|
(uint8_t*)bufAccessor->buf()->buffer() + packetSize * i);
|
|
EXPECT_EQ(iovecs[i].iov_len, packetSize);
|
|
}
|
|
|
|
return 1;
|
|
}));
|
|
std::string externalData = "external data";
|
|
memcpy(
|
|
bufAccessor->buf()->writableTail(),
|
|
externalData.data(),
|
|
externalData.size());
|
|
bufAccessor->buf()->append(externalData.size());
|
|
batchWriter->write(sock, folly::SocketAddress());
|
|
EXPECT_EQ(bufAccessor->buf()->length(), externalData.size());
|
|
EXPECT_EQ(
|
|
memcmp(
|
|
externalData.data(), bufAccessor->buf()->data(), externalData.size()),
|
|
0);
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterNeedsFlush) {
|
|
gsoSupported_ = true;
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
|
|
|
for (size_t i = 0; i < 10; i++) {
|
|
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
|
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr);
|
|
}
|
|
EXPECT_TRUE(batchWriter->needsFlush(conn_.udpSendPacketLen));
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterAppendLimit) {
|
|
gsoSupported_ = true;
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
|
|
|
for (size_t i = 0; i < batchSize - 1; i++) {
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(1000);
|
|
bufAccessor->release(std::move(buf));
|
|
EXPECT_FALSE(
|
|
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr));
|
|
}
|
|
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(1000);
|
|
bufAccessor->release(std::move(buf));
|
|
EXPECT_TRUE(
|
|
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr));
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterAppendSmaller) {
|
|
gsoSupported_ = true;
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
|
|
|
for (size_t i = 0; i < batchSize / 2; i++) {
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(1000);
|
|
bufAccessor->release(std::move(buf));
|
|
EXPECT_FALSE(
|
|
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr));
|
|
}
|
|
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(700);
|
|
bufAccessor->release(std::move(buf));
|
|
EXPECT_TRUE(
|
|
batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr));
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterWriteAll) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
gsoSupported_ = true;
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
ASSERT_FALSE(batchWriter->needsFlush(1000));
|
|
|
|
for (size_t i = 0; i < 5; i++) {
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(1000);
|
|
bufAccessor->release(std::move(buf));
|
|
ASSERT_FALSE(
|
|
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr));
|
|
}
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(700);
|
|
bufAccessor->release(std::move(buf));
|
|
ASSERT_TRUE(
|
|
batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr));
|
|
|
|
EXPECT_CALL(sock, writeGSO(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](const auto& /* addr */,
|
|
const struct iovec* vec,
|
|
size_t,
|
|
QuicAsyncUDPSocket::WriteOptions options) {
|
|
EXPECT_EQ(1000 * 5 + 700, vec[0].iov_len);
|
|
EXPECT_EQ(1000, options.gso);
|
|
return 1000 * 5 + 700;
|
|
}));
|
|
EXPECT_EQ(1000 * 5 + 700, batchWriter->write(sock, folly::SocketAddress()));
|
|
|
|
EXPECT_TRUE(bufAccessor->ownsBuffer());
|
|
buf = bufAccessor->obtain();
|
|
EXPECT_EQ(0, buf->length());
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterWriteOne) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
gsoSupported_ = true;
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
CHECK(batchWriter);
|
|
ASSERT_FALSE(batchWriter->needsFlush(1000));
|
|
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(1000);
|
|
bufAccessor->release(std::move(buf));
|
|
ASSERT_FALSE(
|
|
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr));
|
|
|
|
EXPECT_CALL(sock, writeGSO(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke(
|
|
[&](const auto& /* addr */, const struct iovec* vec, size_t, auto) {
|
|
EXPECT_EQ(1000, vec[0].iov_len);
|
|
return 1000;
|
|
}));
|
|
EXPECT_EQ(1000, batchWriter->write(sock, folly::SocketAddress()));
|
|
|
|
EXPECT_TRUE(bufAccessor->ownsBuffer());
|
|
buf = bufAccessor->obtain();
|
|
EXPECT_EQ(0, buf->length());
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterLastOneTooBig) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
gsoSupported_ = true;
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
for (size_t i = 0; i < 5; i++) {
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(700);
|
|
bufAccessor->release(std::move(buf));
|
|
ASSERT_FALSE(
|
|
batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr));
|
|
}
|
|
auto buf = bufAccessor->obtain();
|
|
buf->append(1000);
|
|
bufAccessor->release(std::move(buf));
|
|
EXPECT_TRUE(batchWriter->needsFlush(1000));
|
|
|
|
EXPECT_CALL(sock, writeGSO(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](const auto& /* addr */,
|
|
const struct iovec* vec,
|
|
size_t,
|
|
QuicAsyncUDPSocket::WriteOptions options) {
|
|
EXPECT_EQ(5 * 700, vec[0].iov_len);
|
|
EXPECT_EQ(700, options.gso);
|
|
return 700 * 5;
|
|
}));
|
|
EXPECT_EQ(5 * 700, batchWriter->write(sock, folly::SocketAddress()));
|
|
|
|
EXPECT_TRUE(bufAccessor->ownsBuffer());
|
|
buf = bufAccessor->obtain();
|
|
EXPECT_EQ(1000, buf->length());
|
|
EXPECT_EQ(0, buf->headroom());
|
|
}
|
|
|
|
TEST_F(QuicBatchWriterTest, InplaceWriterBufResidueCheck) {
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
gsoSupported_ = true;
|
|
|
|
uint32_t batchSize = 20;
|
|
auto bufAccessor =
|
|
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
|
conn_.bufAccessor = bufAccessor.get();
|
|
conn_.udpSendPacketLen = 1000;
|
|
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
|
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
|
batchSize,
|
|
false, /* enable backpressure */
|
|
DataPathType::ContinuousMemory,
|
|
conn_,
|
|
gsoSupported_);
|
|
auto buf = bufAccessor->obtain();
|
|
folly::IOBuf* Buf = buf.get();
|
|
bufAccessor->release(std::move(buf));
|
|
Buf->append(700);
|
|
ASSERT_FALSE(
|
|
batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr));
|
|
|
|
// There is a check against packet 10 bytes or more larger than the size limit
|
|
size_t packetSizeBig = 1009;
|
|
Buf->append(packetSizeBig);
|
|
EXPECT_TRUE(batchWriter->needsFlush(packetSizeBig));
|
|
|
|
EXPECT_CALL(sock, writeGSO(_, _, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke(
|
|
[&](const auto& /* addr */, const struct iovec* vec, size_t, auto) {
|
|
EXPECT_EQ(700, vec[0].iov_len);
|
|
return 700;
|
|
}));
|
|
// No crash:
|
|
EXPECT_EQ(700, batchWriter->write(sock, folly::SocketAddress()));
|
|
EXPECT_EQ(1009, Buf->length());
|
|
EXPECT_EQ(0, Buf->headroom());
|
|
}
|
|
|
|
class SinglePacketInplaceBatchWriterTest : public ::testing::Test {
|
|
public:
|
|
SinglePacketInplaceBatchWriterTest()
|
|
: conn_(FizzServerQuicHandshakeContext::Builder().build()) {}
|
|
|
|
void SetUp() override {
|
|
bufAccessor_ = std::make_unique<quic::BufAccessor>(conn_.udpSendPacketLen);
|
|
conn_.bufAccessor = bufAccessor_.get();
|
|
}
|
|
|
|
quic::BatchWriterPtr makeBatchWriter(
|
|
quic::QuicBatchingMode batchingMode =
|
|
quic::QuicBatchingMode::BATCHING_MODE_NONE) {
|
|
return quic::BatchWriterFactory::makeBatchWriter(
|
|
batchingMode,
|
|
conn_.transportSettings.maxBatchSize,
|
|
conn_.transportSettings.enableWriterBackpressure,
|
|
conn_.transportSettings.dataPathType,
|
|
conn_,
|
|
false /* gsoSupported_ */);
|
|
}
|
|
|
|
void enableSinglePacketInplaceBatchWriter() {
|
|
conn_.transportSettings.maxBatchSize = 1;
|
|
conn_.transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
|
}
|
|
|
|
protected:
|
|
std::unique_ptr<quic::BufAccessor> bufAccessor_;
|
|
QuicServerConnectionState conn_;
|
|
};
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestFactorySuccess) {
|
|
enableSinglePacketInplaceBatchWriter();
|
|
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()));
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestFactoryNoTransportSetting) {
|
|
conn_.transportSettings.maxBatchSize = 1;
|
|
conn_.transportSettings.dataPathType = DataPathType::ChainedMemory;
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
EXPECT_EQ(
|
|
dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()),
|
|
nullptr);
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestFactoryNoTransportSetting2) {
|
|
conn_.transportSettings.maxBatchSize = 16;
|
|
conn_.transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
EXPECT_EQ(
|
|
dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()),
|
|
nullptr);
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestFactoryWrongBatchingMode) {
|
|
enableSinglePacketInplaceBatchWriter();
|
|
|
|
auto batchWriter = makeBatchWriter(quic::QuicBatchingMode::BATCHING_MODE_GSO);
|
|
CHECK(batchWriter);
|
|
EXPECT_EQ(
|
|
dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()),
|
|
nullptr);
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestReset) {
|
|
enableSinglePacketInplaceBatchWriter();
|
|
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()));
|
|
|
|
auto buf = bufAccessor_->obtain();
|
|
folly::IOBuf* Buf = buf.get();
|
|
bufAccessor_->release(std::move(buf));
|
|
Buf->append(700);
|
|
|
|
EXPECT_EQ(Buf->computeChainDataLength(), 700);
|
|
batchWriter->reset();
|
|
EXPECT_EQ(Buf->computeChainDataLength(), 0);
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestAppend) {
|
|
enableSinglePacketInplaceBatchWriter();
|
|
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()));
|
|
|
|
EXPECT_EQ(
|
|
true, batchWriter->append(nullptr, 0, folly::SocketAddress(), nullptr));
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestEmpty) {
|
|
enableSinglePacketInplaceBatchWriter();
|
|
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()));
|
|
EXPECT_TRUE(batchWriter->empty());
|
|
|
|
auto buf = bufAccessor_->obtain();
|
|
folly::IOBuf* Buf = buf.get();
|
|
bufAccessor_->release(std::move(buf));
|
|
Buf->append(700);
|
|
|
|
EXPECT_EQ(Buf->computeChainDataLength(), 700);
|
|
EXPECT_FALSE(batchWriter->empty());
|
|
|
|
batchWriter->reset();
|
|
EXPECT_TRUE(batchWriter->empty());
|
|
}
|
|
|
|
TEST_F(SinglePacketInplaceBatchWriterTest, TestWrite) {
|
|
enableSinglePacketInplaceBatchWriter();
|
|
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketInplaceBatchWriter*>(batchWriter.get()));
|
|
EXPECT_TRUE(batchWriter->empty());
|
|
|
|
auto buf = bufAccessor_->obtain();
|
|
folly::IOBuf* Buf = buf.get();
|
|
bufAccessor_->release(std::move(buf));
|
|
const auto appendSize = conn_.udpSendPacketLen - 200;
|
|
Buf->append(appendSize);
|
|
|
|
EXPECT_EQ(Buf->computeChainDataLength(), appendSize);
|
|
EXPECT_FALSE(batchWriter->empty());
|
|
|
|
folly::EventBase evb;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb =
|
|
std::make_shared<FollyQuicEventBase>(&evb);
|
|
quic::test::MockAsyncUDPSocket sock(qEvb);
|
|
EXPECT_CALL(sock, write(_, _, _))
|
|
.Times(1)
|
|
.WillOnce(
|
|
Invoke([&](const auto& /* addr */, const struct iovec* vec, size_t) {
|
|
EXPECT_EQ(appendSize, vec[0].iov_len);
|
|
return appendSize;
|
|
}));
|
|
EXPECT_EQ(appendSize, batchWriter->write(sock, folly::SocketAddress()));
|
|
EXPECT_TRUE(batchWriter->empty());
|
|
}
|
|
|
|
struct SinglePacketBackpressureBatchWriterTest : public ::testing::Test {
|
|
SinglePacketBackpressureBatchWriterTest()
|
|
: conn_(FizzServerQuicHandshakeContext::Builder().build()),
|
|
qEvb_(std::make_shared<FollyQuicEventBase>(&evb_)),
|
|
sock_(qEvb_) {
|
|
conn_.transportSettings.dataPathType = DataPathType::ChainedMemory;
|
|
conn_.transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_NONE;
|
|
conn_.transportSettings.maxBatchSize = 1;
|
|
conn_.transportSettings.enableWriterBackpressure = true;
|
|
conn_.transportSettings.useSockWritableEvents = true;
|
|
}
|
|
|
|
BatchWriterPtr makeBatchWriter() {
|
|
return quic::BatchWriterFactory::makeBatchWriter(
|
|
conn_.transportSettings.batchingMode,
|
|
conn_.transportSettings.maxBatchSize,
|
|
conn_.transportSettings.enableWriterBackpressure,
|
|
conn_.transportSettings.dataPathType,
|
|
conn_,
|
|
false /* gsoSupported */);
|
|
}
|
|
|
|
protected:
|
|
QuicServerConnectionState conn_;
|
|
folly::EventBase evb_;
|
|
std::shared_ptr<FollyQuicEventBase> qEvb_;
|
|
quic::test::MockAsyncUDPSocket sock_;
|
|
};
|
|
|
|
TEST_F(SinglePacketBackpressureBatchWriterTest, TestAppendRequestsFlush) {
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketBackpressureBatchWriter*>(
|
|
batchWriter.get()));
|
|
EXPECT_TRUE(batchWriter->empty());
|
|
|
|
auto buf = folly::IOBuf::copyBuffer("append attempt");
|
|
EXPECT_TRUE(batchWriter->append(
|
|
std::move(buf),
|
|
buf->computeChainDataLength(),
|
|
folly::SocketAddress(),
|
|
&sock_));
|
|
}
|
|
|
|
TEST_F(SinglePacketBackpressureBatchWriterTest, TestFailedWriteCachedOnEAGAIN) {
|
|
auto batchWriter = makeBatchWriter();
|
|
CHECK(batchWriter);
|
|
CHECK(dynamic_cast<quic::SinglePacketBackpressureBatchWriter*>(
|
|
batchWriter.get()));
|
|
EXPECT_TRUE(batchWriter->empty());
|
|
|
|
std::string testString = "append attempt";
|
|
auto buf = folly::IOBuf::copyBuffer(testString);
|
|
|
|
EXPECT_TRUE(batchWriter->append(
|
|
std::move(buf),
|
|
buf->computeChainDataLength(),
|
|
folly::SocketAddress(),
|
|
&sock_));
|
|
|
|
EXPECT_CALL(sock_, write(_, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](const auto& /* addr */,
|
|
const struct iovec* /* vec */,
|
|
size_t /* iovec_len */) {
|
|
errno = EAGAIN;
|
|
return 0;
|
|
}));
|
|
// The write fails
|
|
EXPECT_EQ(batchWriter->write(sock_, folly::SocketAddress()), 0);
|
|
|
|
// Resetting does not clear the cached buffer from the writer but the buffer
|
|
// is not yet cached in the transport.
|
|
batchWriter->reset();
|
|
EXPECT_FALSE(conn_.pendingWriteBatch_.buf);
|
|
|
|
// Destroying the writer caches the buffer in the transport.
|
|
batchWriter = nullptr;
|
|
EXPECT_TRUE(conn_.pendingWriteBatch_.buf);
|
|
|
|
// A new batch writer picks up the cached buffer from the transport
|
|
batchWriter = makeBatchWriter();
|
|
EXPECT_FALSE(conn_.pendingWriteBatch_.buf);
|
|
|
|
// The write succeeds
|
|
EXPECT_CALL(sock_, write(_, _, _))
|
|
.Times(1)
|
|
.WillOnce(Invoke([&](const auto& /* addr */,
|
|
const struct iovec* vec,
|
|
size_t iovec_len) {
|
|
return ::quic::test::getTotalIovecLen(vec, iovec_len);
|
|
}));
|
|
EXPECT_EQ(
|
|
batchWriter->write(sock_, folly::SocketAddress()), testString.size());
|
|
|
|
// Nothing is cached in the transport after the writer is reset and destroyed.
|
|
batchWriter->reset();
|
|
batchWriter = nullptr;
|
|
EXPECT_FALSE(conn_.pendingWriteBatch_.buf);
|
|
}
|
|
|
|
} // namespace quic::testing
|