diff --git a/quic/api/QuicBatchWriter.cpp b/quic/api/QuicBatchWriter.cpp index 03b2d5b87..189434ac4 100644 --- a/quic/api/QuicBatchWriter.cpp +++ b/quic/api/QuicBatchWriter.cpp @@ -218,6 +218,109 @@ ssize_t GSOPacketBatchWriter::write( : sock.write(address, buf_); } +GSOInplacePacketBatchWriter::GSOInplacePacketBatchWriter( + QuicConnectionStateBase& conn, + size_t maxPackets) + : conn_(conn), maxPackets_(maxPackets) {} + +void GSOInplacePacketBatchWriter::reset() { + lastPacketEnd_ = nullptr; + prevSize_ = 0; + numPackets_ = 0; +} + +bool GSOInplacePacketBatchWriter::needsFlush(size_t size) { + return prevSize_ && size > prevSize_; +} + +bool GSOInplacePacketBatchWriter::append( + std::unique_ptr&& /*buf*/, + size_t size, + const folly::SocketAddress& /* addr */, + folly::AsyncUDPSocket* /* sock */) { + CHECK(!needsFlush(size)); + ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor); + auto& buf = scopedBufAccessor.buf(); + if (!lastPacketEnd_) { + CHECK(prevSize_ == 0 && numPackets_ == 0); + prevSize_ = size; + lastPacketEnd_ = buf->tail(); + numPackets_ = 1; + return false; + } + + CHECK(prevSize_ && prevSize_ >= size); + ++numPackets_; + lastPacketEnd_ = buf->tail(); + if (prevSize_ > size || numPackets_ == maxPackets_) { + return true; + } + return false; +} + +/** + * Write the buffer owned by conn_.bufAccessor to the sock, until + * lastPacketEnd_. After write, everything in the buffer after lastPacketEnd_ + * will be moved to the beginning of the buffer, and buffer will be returned to + * conn_.bufAccessor. + */ +ssize_t GSOInplacePacketBatchWriter::write( + folly::AsyncUDPSocket& sock, + const folly::SocketAddress& address) { + ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor); + CHECK(lastPacketEnd_); + auto& buf = scopedBufAccessor.buf(); + CHECK(!buf->isChained()); + CHECK(lastPacketEnd_ >= buf->data() && lastPacketEnd_ <= buf->tail()) + << "lastPacketEnd_=" << (long)lastPacketEnd_ + << " data=" << (long)buf->data() << " tail=" << (long)buf->tail(); + auto diffToEnd = buf->tail() - lastPacketEnd_; + CHECK( + diffToEnd >= 0 && + static_cast(diffToEnd) <= conn_.udpSendPacketLen); + auto diffToStart = lastPacketEnd_ - buf->data(); + buf->trimEnd(diffToEnd); + auto bytesWritten = (numPackets_ > 1) + ? sock.writeGSO(address, buf, static_cast(prevSize_)) + : sock.write(address, buf); + /** + * If there is one more bytes after lastPacketEnd_, that means there is a + * packet we choose not to write in this batch (e.g., it has a size larger + * than all existing packets in this batch). So after the socket write, we + * need to move that packet from the middle of the buffer to the beginning of + * the buffer so make sure we maximize the buffer space. An alternative here + * is to writem to write everything out in the previous sock write call. But + * that needs a much bigger change in the IoBufQuicBatch API. + */ + if (diffToEnd) { + buf->trimStart(diffToStart); + buf->append(diffToEnd); + buf->retreat(diffToStart); + CHECK(buf->length() <= conn_.udpSendPacketLen); + CHECK(0 == buf->headroom()); + } else { + buf->clear(); + } + reset(); + return bytesWritten; +} + +bool GSOInplacePacketBatchWriter::empty() const { + return numPackets_ == 0; +} + +size_t GSOInplacePacketBatchWriter::size() const { + if (empty()) { + return 0; + } + ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor); + CHECK(lastPacketEnd_); + auto& buf = scopedBufAccessor.buf(); + CHECK(lastPacketEnd_ >= buf->data() && lastPacketEnd_ <= buf->tail()); + size_t ret = lastPacketEnd_ - buf->data(); + return ret; +} + // SendmmsgPacketBatchWriter SendmmsgPacketBatchWriter::SendmmsgPacketBatchWriter(size_t maxBufs) : maxBufs_(maxBufs) { @@ -390,7 +493,9 @@ BatchWriterPtr BatchWriterFactory::makeBatchWriter( const quic::QuicBatchingMode& batchingMode, uint32_t batchSize, bool useThreadLocal, - const std::chrono::microseconds& threadLocalDelay) { + const std::chrono::microseconds& threadLocalDelay, + DataPathType dataPathType, + QuicConnectionStateBase& conn) { #if USE_THREAD_LOCAL_BATCH_WRITER if (useThreadLocal && (batchingMode == quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO) && @@ -414,7 +519,10 @@ BatchWriterPtr BatchWriterFactory::makeBatchWriter( return BatchWriterPtr(new SinglePacketBatchWriter()); case quic::QuicBatchingMode::BATCHING_MODE_GSO: { if (sock.getGSO() >= 0) { - return BatchWriterPtr(new GSOPacketBatchWriter(batchSize)); + if (dataPathType == DataPathType::ChainedMemory) { + return BatchWriterPtr(new GSOPacketBatchWriter(batchSize)); + } + return BatchWriterPtr(new GSOInplacePacketBatchWriter(conn, batchSize)); } return BatchWriterPtr(new SinglePacketBatchWriter()); diff --git a/quic/api/QuicBatchWriter.h b/quic/api/QuicBatchWriter.h index eb5841a5f..47f54ec7e 100644 --- a/quic/api/QuicBatchWriter.h +++ b/quic/api/QuicBatchWriter.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace quic { class BatchWriter { @@ -128,6 +129,54 @@ class GSOPacketBatchWriter : public IOBufBatchWriter { size_t prevSize_{0}; }; +class GSOInplacePacketBatchWriter : public BatchWriter { + public: + explicit GSOInplacePacketBatchWriter( + QuicConnectionStateBase& conn, + size_t maxPackets); + ~GSOInplacePacketBatchWriter() override = default; + + struct ScopedBufAccessor { + public: + explicit ScopedBufAccessor(BufAccessor* accessor) : bufAccessor_(accessor) { + CHECK(bufAccessor_->ownsBuffer()); + buf_ = bufAccessor_->obtain(); + } + + ~ScopedBufAccessor() { + bufAccessor_->release(std::move(buf_)); + } + + std::unique_ptr& buf() { + return buf_; + } + + private: + BufAccessor* bufAccessor_; + std::unique_ptr buf_; + }; + + void reset() override; + bool needsFlush(size_t size) override; + bool append( + std::unique_ptr&& buf, + size_t size, + const folly::SocketAddress& addr, + folly::AsyncUDPSocket* sock) override; + ssize_t write( + folly::AsyncUDPSocket& sock, + const folly::SocketAddress& address) override; + bool empty() const override; + size_t size() const override; + + private: + QuicConnectionStateBase& conn_; + size_t maxPackets_; + const uint8_t* lastPacketEnd_{nullptr}; + size_t prevSize_{0}; + size_t numPackets_{0}; +}; + class SendmmsgPacketBatchWriter : public BatchWriter { public: explicit SendmmsgPacketBatchWriter(size_t maxBufs); @@ -203,7 +252,9 @@ class BatchWriterFactory { const quic::QuicBatchingMode& batchingMode, uint32_t batchSize, bool useThreadLocal, - const std::chrono::microseconds& threadLocalDelay); + const std::chrono::microseconds& threadLocalDelay, + DataPathType dataPathType, + QuicConnectionStateBase& conn); }; } // namespace quic diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index a1f478acf..fb50a5c12 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -979,7 +979,9 @@ uint64_t writeConnectionDataToSocket( connection.transportSettings.batchingMode, connection.transportSettings.maxBatchSize, connection.transportSettings.useThreadLocalBatching, - connection.transportSettings.threadLocalDelay); + connection.transportSettings.threadLocalDelay, + connection.transportSettings.dataPathType, + connection); IOBufQuicBatch ioBufBatch( std::move(batchWriter), diff --git a/quic/api/test/QuicBatchWriterTest.cpp b/quic/api/test/QuicBatchWriterTest.cpp index 7cb0fcb7f..ad2fe11c9 100644 --- a/quic/api/test/QuicBatchWriterTest.cpp +++ b/quic/api/test/QuicBatchWriterTest.cpp @@ -8,7 +8,11 @@ #include +#include #include +#include + +using namespace testing; namespace quic { namespace testing { @@ -20,7 +24,10 @@ constexpr const auto kBatchNum = 3; constexpr const auto kNumLoops = 10; struct QuicBatchWriterTest : public ::testing::Test, - public ::testing::WithParamInterface {}; + public ::testing::WithParamInterface { + protected: + QuicServerConnectionState conn_; +}; TEST_P(QuicBatchWriterTest, TestBatchingNone) { bool useThreadLocal = GetParam(); @@ -34,7 +41,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingNone) { quic::QuicBatchingMode::BATCHING_MODE_NONE, kBatchNum, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest('A', kStrLen); @@ -63,7 +72,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOBase) { quic::QuicBatchingMode::BATCHING_MODE_GSO, 1, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest(kStrLen, 'A'); // if GSO is not available, just test we've got a regular @@ -90,7 +101,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOLastSmallPacket) { quic::QuicBatchingMode::BATCHING_MODE_GSO, 1, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest; // only if GSO is available @@ -129,7 +142,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOLastBigPacket) { quic::QuicBatchingMode::BATCHING_MODE_GSO, 1, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest; // only if GSO is available @@ -163,7 +178,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOBatchNum) { quic::QuicBatchingMode::BATCHING_MODE_GSO, kBatchNum, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest(kStrLen, 'A'); // if GSO is not available, just test we've got a regular @@ -206,7 +223,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsg) { quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG, kBatchNum, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest(kStrLen, 'A'); @@ -246,7 +265,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatchNum) { quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO, kBatchNum, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest(kStrLen, 'A'); // if GSO is not available, just test we've got a regular @@ -289,7 +310,9 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatcBigSmallPacket) { quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO, 3 * kBatchNum, useThreadLocal, - quic::kDefaultThreadLocalDelay); + quic::kDefaultThreadLocalDelay, + DataPathType::ChainedMemory, + conn_); CHECK(batchWriter); std::string strTest(kStrLen, 'A'); // if GSO is not available, just test we've got a regular @@ -325,6 +348,241 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatcBigSmallPacket) { } } +TEST_P(QuicBatchWriterTest, InplaceWriterNeedsFlush) { + bool useThreadLocal = GetParam(); + folly::EventBase evb; + folly::AsyncUDPSocket sock(&evb); + sock.setReuseAddr(false); + sock.bind(folly::SocketAddress("127.0.0.1", 0)); + uint32_t batchSize = 20; + auto bufAccessor = + std::make_unique(conn_.udpSendPacketLen * batchSize); + conn_.bufAccessor = bufAccessor.get(); + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + sock, + quic::QuicBatchingMode::BATCHING_MODE_GSO, + batchSize, + useThreadLocal, + quic::kDefaultThreadLocalDelay, + DataPathType::ContinuousMemory, + conn_); + CHECK(batchWriter); + EXPECT_FALSE(batchWriter->needsFlush(1000)); + + for (size_t i = 0; i < 10; i++) { + EXPECT_FALSE(batchWriter->needsFlush(1000)); + batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr); + } + EXPECT_TRUE(batchWriter->needsFlush(conn_.udpSendPacketLen)); +} + +TEST_P(QuicBatchWriterTest, InplaceWriterAppendLimit) { + bool useThreadLocal = GetParam(); + folly::EventBase evb; + folly::AsyncUDPSocket sock(&evb); + sock.setReuseAddr(false); + sock.bind(folly::SocketAddress("127.0.0.1", 0)); + uint32_t batchSize = 20; + auto bufAccessor = + std::make_unique(conn_.udpSendPacketLen * batchSize); + conn_.bufAccessor = bufAccessor.get(); + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + sock, + quic::QuicBatchingMode::BATCHING_MODE_GSO, + batchSize, + useThreadLocal, + quic::kDefaultThreadLocalDelay, + DataPathType::ContinuousMemory, + conn_); + CHECK(batchWriter); + EXPECT_FALSE(batchWriter->needsFlush(1000)); + + for (size_t i = 0; i < batchSize - 1; i++) { + auto buf = bufAccessor->obtain(); + buf->append(1000); + bufAccessor->release(std::move(buf)); + EXPECT_FALSE( + batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr)); + } + + auto buf = bufAccessor->obtain(); + buf->append(1000); + bufAccessor->release(std::move(buf)); + EXPECT_TRUE( + batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr)); +} + +TEST_P(QuicBatchWriterTest, InplaceWriterAppendSmaller) { + bool useThreadLocal = GetParam(); + folly::EventBase evb; + folly::AsyncUDPSocket sock(&evb); + sock.setReuseAddr(false); + sock.bind(folly::SocketAddress("127.0.0.1", 0)); + uint32_t batchSize = 20; + auto bufAccessor = + std::make_unique(conn_.udpSendPacketLen * batchSize); + conn_.bufAccessor = bufAccessor.get(); + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + sock, + quic::QuicBatchingMode::BATCHING_MODE_GSO, + batchSize, + useThreadLocal, + quic::kDefaultThreadLocalDelay, + DataPathType::ContinuousMemory, + conn_); + CHECK(batchWriter); + EXPECT_FALSE(batchWriter->needsFlush(1000)); + + for (size_t i = 0; i < batchSize / 2; i++) { + auto buf = bufAccessor->obtain(); + buf->append(1000); + bufAccessor->release(std::move(buf)); + EXPECT_FALSE( + batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr)); + } + + auto buf = bufAccessor->obtain(); + buf->append(700); + bufAccessor->release(std::move(buf)); + EXPECT_TRUE( + batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr)); +} + +TEST_P(QuicBatchWriterTest, InplaceWriterWriteAll) { + bool useThreadLocal = GetParam(); + folly::EventBase evb; + folly::test::MockAsyncUDPSocket sock(&evb); + uint32_t batchSize = 20; + auto bufAccessor = + std::make_unique(conn_.udpSendPacketLen * batchSize); + conn_.bufAccessor = bufAccessor.get(); + EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1)); + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + sock, + quic::QuicBatchingMode::BATCHING_MODE_GSO, + batchSize, + useThreadLocal, + quic::kDefaultThreadLocalDelay, + DataPathType::ContinuousMemory, + conn_); + CHECK(batchWriter); + ASSERT_FALSE(batchWriter->needsFlush(1000)); + + for (size_t i = 0; i < 5; i++) { + auto buf = bufAccessor->obtain(); + buf->append(1000); + bufAccessor->release(std::move(buf)); + ASSERT_FALSE( + batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr)); + } + auto buf = bufAccessor->obtain(); + buf->append(700); + bufAccessor->release(std::move(buf)); + ASSERT_TRUE( + batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr)); + + EXPECT_CALL(sock, writeGSO(_, _, _)) + .Times(1) + .WillOnce(Invoke([&](const auto& /* addr */, + const std::unique_ptr& buf, + int gso) { + EXPECT_EQ(1000 * 5 + 700, buf->length()); + EXPECT_EQ(1000, gso); + return 1000 * 5 + 700; + })); + EXPECT_EQ(1000 * 5 + 700, batchWriter->write(sock, folly::SocketAddress())); + + EXPECT_TRUE(bufAccessor->ownsBuffer()); + buf = bufAccessor->obtain(); + EXPECT_EQ(0, buf->length()); +} + +TEST_P(QuicBatchWriterTest, InplaceWriterWriteOne) { + bool useThreadLocal = GetParam(); + folly::EventBase evb; + folly::test::MockAsyncUDPSocket sock(&evb); + uint32_t batchSize = 20; + auto bufAccessor = + std::make_unique(conn_.udpSendPacketLen * batchSize); + conn_.bufAccessor = bufAccessor.get(); + EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1)); + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + sock, + quic::QuicBatchingMode::BATCHING_MODE_GSO, + batchSize, + useThreadLocal, + quic::kDefaultThreadLocalDelay, + DataPathType::ContinuousMemory, + conn_); + CHECK(batchWriter); + ASSERT_FALSE(batchWriter->needsFlush(1000)); + + auto buf = bufAccessor->obtain(); + buf->append(1000); + bufAccessor->release(std::move(buf)); + ASSERT_FALSE( + batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr)); + + EXPECT_CALL(sock, write(_, _)) + .Times(1) + .WillOnce(Invoke([&](const auto& /* addr */, + const std::unique_ptr& buf) { + EXPECT_EQ(1000, buf->length()); + return 1000; + })); + EXPECT_EQ(1000, batchWriter->write(sock, folly::SocketAddress())); + + EXPECT_TRUE(bufAccessor->ownsBuffer()); + buf = bufAccessor->obtain(); + EXPECT_EQ(0, buf->length()); +} + +TEST_P(QuicBatchWriterTest, InplaceWriterLastOneTooBig) { + bool useThreadLocal = GetParam(); + folly::EventBase evb; + folly::test::MockAsyncUDPSocket sock(&evb); + uint32_t batchSize = 20; + auto bufAccessor = + std::make_unique(conn_.udpSendPacketLen * batchSize); + conn_.bufAccessor = bufAccessor.get(); + EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1)); + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + sock, + quic::QuicBatchingMode::BATCHING_MODE_GSO, + batchSize, + useThreadLocal, + quic::kDefaultThreadLocalDelay, + DataPathType::ContinuousMemory, + conn_); + for (size_t i = 0; i < 5; i++) { + auto buf = bufAccessor->obtain(); + buf->append(700); + bufAccessor->release(std::move(buf)); + ASSERT_FALSE( + batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr)); + } + auto buf = bufAccessor->obtain(); + buf->append(1000); + bufAccessor->release(std::move(buf)); + EXPECT_TRUE(batchWriter->needsFlush(1000)); + + EXPECT_CALL(sock, writeGSO(_, _, _)) + .Times(1) + .WillOnce(Invoke([&](const auto& /* addr */, + const std::unique_ptr& buf, + int gso) { + EXPECT_EQ(5 * 700, buf->length()); + EXPECT_EQ(700, gso); + return 700 * 5; + })); + EXPECT_EQ(5 * 700, batchWriter->write(sock, folly::SocketAddress())); + + EXPECT_TRUE(bufAccessor->ownsBuffer()); + buf = bufAccessor->obtain(); + EXPECT_EQ(1000, buf->length()); + EXPECT_EQ(0, buf->headroom()); +} + INSTANTIATE_TEST_CASE_P( QuicBatchWriterTest, QuicBatchWriterTest,