From c6a141bf80ff0dc1a01be68dd565c0edb5c62066 Mon Sep 17 00:00:00 2001 From: Paul Farcasanu Date: Wed, 7 May 2025 09:32:06 -0700 Subject: [PATCH] Support transport isPriming Summary: When the transport is in priming mode it saves all packets instead of writing them on the wire, and feeds them to a callback for the caller to get the data. Meta: Priming mode is used to get all packets before 1-RTT cipher is available, in order for them to get replayed later. Reviewed By: kvtsoy Differential Revision: D71290230 fbshipit-source-id: 230650cb1e5901069dda4ef850c9c724bf33b6be --- quic/api/QuicSocketLite.h | 8 ++ quic/api/QuicTransportBaseLite.cpp | 4 + quic/api/QuicTransportFunctions.cpp | 14 +++ quic/api/test/Mocks.h | 5 + quic/client/QuicClientTransportLite.cpp | 7 ++ quic/client/test/BUCK | 16 ++++ quic/client/test/CMakeLists.txt | 1 + .../test/QuicClientTransportLiteTest.cpp | 95 +++++++++++++++++++ quic/common/BufAccessor.cpp | 2 +- quic/common/test/BufAccessorTest.cpp | 2 +- quic/loss/QuicLossFunctions.cpp | 4 + quic/state/StateData.h | 3 + quic/state/TransportSettings.h | 4 + 13 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 quic/client/test/QuicClientTransportLiteTest.cpp diff --git a/quic/api/QuicSocketLite.h b/quic/api/QuicSocketLite.h index c6c772c80..15cd64fea 100644 --- a/quic/api/QuicSocketLite.h +++ b/quic/api/QuicSocketLite.h @@ -114,6 +114,14 @@ class QuicSocketLite { * onTransportReady(), signifies full crypto handshake finished. */ virtual void onFullHandshakeDone() noexcept {} + + /** + * Client only. + * Called when the transport is in priming mode and 0-RTT packets are + * available + */ + virtual void onPrimingDataAvailable( + std::vector&& /* data */) noexcept {} }; /** diff --git a/quic/api/QuicTransportBaseLite.cpp b/quic/api/QuicTransportBaseLite.cpp index 257e77fe9..0367d5d2a 100644 --- a/quic/api/QuicTransportBaseLite.cpp +++ b/quic/api/QuicTransportBaseLite.cpp @@ -1321,6 +1321,10 @@ QuicTransportBaseLite::writeSocketData() { if (result.hasError()) { return result; } + if (conn_->transportSettings.isPriming && conn_->primingData_.size() > 0) { + connSetupCallback_->onPrimingDataAvailable( + std::move(conn_->primingData_)); + } if (closeState_ != CloseState::CLOSED) { if (conn_->pendingEvents.closeTransport == true) { return folly::makeUnexpected(QuicError( diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 2bbfff26d..d9683531c 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -327,6 +327,14 @@ continuousMemoryBuildScheduleEncrypt( auto encodedBodySize = encodedSize - headerLen; // Include previous packets back. packetBuf->prepend(prevSize); + if (connection.transportSettings.isPriming && packetBuf) { + packetBuf->coalesce(); + connection.bufAccessor->release( + folly::IOBuf::create(packetBuf->capacity())); + connection.primingData_.emplace_back(std::move(packetBuf)); + return DataPathResult::makeWriteResult( + true, std::move(result.value()), encodedSize, encodedBodySize); + } connection.bufAccessor->release(std::move(packetBuf)); if (encodedSize > connection.udpSendPacketLen) { VLOG(3) << "Quic sending pkt larger than limit, encodedSize=" @@ -424,6 +432,12 @@ iobufChainBasedBuildScheduleEncrypt( VLOG(3) << "Quic sending pkt larger than limit, encodedSize=" << encodedSize << " encodedBodySize=" << encodedBodySize; } + if (connection.transportSettings.isPriming && packetBuf) { + packetBuf->coalesce(); + connection.primingData_.emplace_back(std::move(packetBuf)); + return DataPathResult::makeWriteResult( + true, std::move(result.value()), encodedSize, encodedBodySize); + } auto writeResult = ioBufBatch.write(std::move(packetBuf), encodedSize); if (writeResult.hasError()) { return folly::makeUnexpected(writeResult.error()); diff --git a/quic/api/test/Mocks.h b/quic/api/test/Mocks.h index 0e3fdb13e..f8c18211f 100644 --- a/quic/api/test/Mocks.h +++ b/quic/api/test/Mocks.h @@ -95,6 +95,11 @@ class MockConnectionSetupCallback : public QuicSocket::ConnectionSetupCallback { MOCK_METHOD((void), onTransportReady, (), (noexcept)); MOCK_METHOD((void), onFirstPeerPacketProcessed, (), (noexcept)); MOCK_METHOD((void), onFullHandshakeDone, (), (noexcept)); + MOCK_METHOD( + (void), + onPrimingDataAvailable, + (std::vector&&), + (noexcept)); }; class MockConnectionCallback : public QuicSocket::ConnectionCallback { diff --git a/quic/client/QuicClientTransportLite.cpp b/quic/client/QuicClientTransportLite.cpp index ff0a2cc40..92a5a75a8 100644 --- a/quic/client/QuicClientTransportLite.cpp +++ b/quic/client/QuicClientTransportLite.cpp @@ -1201,6 +1201,13 @@ QuicClientTransportLite::startCryptoHandshake() { clientPtr->connSetupCallback_->onTransportReady(); } }); + } else if (clientConn_->transportSettings.isPriming) { + auto clientPtr = dynamic_cast(self.get()); + if (clientPtr->connSetupCallback_) { + clientPtr->connSetupCallback_->onConnectionSetupError(QuicError( + QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), + "Priming error: Zero-RTT not available")); + } } return folly::unit; diff --git a/quic/client/test/BUCK b/quic/client/test/BUCK index c1235a1be..4e21dd9be 100644 --- a/quic/client/test/BUCK +++ b/quic/client/test/BUCK @@ -79,3 +79,19 @@ mvfst_cpp_test( "//quic/common/udpsocket/test:QuicAsyncUDPSocketMock", ], ) + +mvfst_cpp_test( + name = "QuicClientTransportLiteTest", + srcs = [ + "QuicClientTransportLiteTest.cpp", + ], + deps = [ + "//quic/api/test:mocks", + "//quic/client:client", + "//quic/client/test:mocks", + "//quic/common/events:folly_eventbase", + "//quic/common/events/test:QuicEventBaseMock", + "//quic/common/test:test_utils", + "//quic/common/udpsocket/test:QuicAsyncUDPSocketMock", + ], +) diff --git a/quic/client/test/CMakeLists.txt b/quic/client/test/CMakeLists.txt index 46d1308a4..acd84eff6 100644 --- a/quic/client/test/CMakeLists.txt +++ b/quic/client/test/CMakeLists.txt @@ -11,6 +11,7 @@ quic_add_test(TARGET ClientStateMachineTest SOURCES ClientStateMachineTest.cpp QuicClientTransportTest.cpp + QuicClientTransportLiteTest.cpp QuicConnectorTest.cpp DEPENDS Folly::folly diff --git a/quic/client/test/QuicClientTransportLiteTest.cpp b/quic/client/test/QuicClientTransportLiteTest.cpp new file mode 100644 index 000000000..ccae72951 --- /dev/null +++ b/quic/client/test/QuicClientTransportLiteTest.cpp @@ -0,0 +1,95 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace ::testing; + +namespace quic::test { + +class QuicClientTransportLiteMock : public QuicClientTransportLite { + public: + QuicClientTransportLiteMock( + std::shared_ptr evb, + std::unique_ptr socket, + std::shared_ptr handshakeFactory) + : QuicTransportBaseLite(evb, std::move(socket)), + QuicClientTransportLite(evb, nullptr, handshakeFactory) {} + + QuicClientConnectionState* getConn() { + return clientConn_; + } +}; + +class QuicClientTransportLiteTest : public Test { + public: + void SetUp() override { + qEvb_ = std::make_shared(&evb_); + auto socket = std::make_unique(); + sockPtr_ = socket.get(); + ON_CALL(*socket, setAdditionalCmsgsFunc(_)) + .WillByDefault(Return(folly::unit)); + ON_CALL(*socket, close()).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, bind(_)).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, connect(_)).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, setReuseAddr(_)).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, setReusePort(_)).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, setRecvTos(_)).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, getRecvTos()).WillByDefault(Return(false)); + ON_CALL(*socket, getGSO()).WillByDefault(Return(0)); + ON_CALL(*socket, setCmsgs(_)).WillByDefault(Return(folly::unit)); + ON_CALL(*socket, appendCmsgs(_)).WillByDefault(Return(folly::unit)); + auto mockFactory = std::make_shared(); + EXPECT_CALL(*mockFactory, _makeClientHandshake(_)) + .WillRepeatedly(Invoke( + [&](QuicClientConnectionState* conn) + -> std::unique_ptr { + return std::make_unique(conn); + })); + quicClient_ = std::make_shared( + qEvb_, std::move(socket), mockFactory); + quicClient_->getConn()->oneRttWriteCipher = test::createNoOpAead(); + quicClient_->getConn()->oneRttWriteHeaderCipher = + test::createNoOpHeaderCipher(); + ASSERT_FALSE(quicClient_->getState() + ->streamManager->setMaxLocalBidirectionalStreams(128) + .hasError()); + } + + void TearDown() override { + EXPECT_CALL(*sockPtr_, close()).WillRepeatedly(Return(folly::unit)); + quicClient_->closeNow(folly::none); + } + + folly::EventBase evb_; + std::shared_ptr qEvb_; + std::shared_ptr quicClient_; + MockConnectionSetupCallback mockConnectionSetupCallback_; + QuicAsyncUDPSocketMock* sockPtr_{nullptr}; +}; + +TEST_F(QuicClientTransportLiteTest, TestPriming) { + auto transportSettings = quicClient_->getTransportSettings(); + transportSettings.isPriming = true; + quicClient_->setTransportSettings(std::move(transportSettings)); + quicClient_->setConnectionSetupCallback(&mockConnectionSetupCallback_); + quicClient_->getConn()->zeroRttWriteCipher = test::createNoOpAead(); + + StreamId streamId = quicClient_->createBidirectionalStream().value(); + quicClient_->writeChain(streamId, folly::IOBuf::copyBuffer("test"), false); + EXPECT_CALL(mockConnectionSetupCallback_, onPrimingDataAvailable(_)); + evb_.loopOnce(EVLOOP_NONBLOCK); +} + +} // namespace quic::test diff --git a/quic/common/BufAccessor.cpp b/quic/common/BufAccessor.cpp index 94da7b52d..aed5d0577 100644 --- a/quic/common/BufAccessor.cpp +++ b/quic/common/BufAccessor.cpp @@ -30,7 +30,7 @@ BufPtr& BufAccessor::buf() { void BufAccessor::release(BufPtr buf) { CHECK(!buf_) << "Can't override existing buf"; CHECK(buf) << "Invalid BufPtr being released"; - CHECK_EQ(buf->capacity(), capacity_) + CHECK_GE(buf->capacity(), capacity_) << "BufPtr has wrong capacity, capacit_=" << capacity_ << ", buf capacity=" << buf->capacity(); CHECK(!buf->isChained()) << "Reject chained buf"; diff --git a/quic/common/test/BufAccessorTest.cpp b/quic/common/test/BufAccessorTest.cpp index 523c57360..44eca30c7 100644 --- a/quic/common/test/BufAccessorTest.cpp +++ b/quic/common/test/BufAccessorTest.cpp @@ -26,7 +26,7 @@ TEST(BufAccessor, BasicAccess) { TEST(BufAccessor, CapacityMatch) { BufAccessor accessor(1000); auto buf = accessor.obtain(); - buf = folly::IOBuf::create(2000); + buf = folly::IOBuf::create(500); EXPECT_DEATH(accessor.release(std::move(buf)), ""); } diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index 4e00a6730..95a3e8aa3 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -48,6 +48,10 @@ bool isPersistentCongestion( folly::Expected onPTOAlarm( QuicConnectionStateBase& conn) { VLOG(10) << __func__ << " " << conn; + if (conn.transportSettings.isPriming) { + // No retransmits in Priming mode + return folly::unit; + } QUIC_STATS(conn.statsCallback, onPTO); conn.lossState.ptoCount++; conn.lossState.totalPTOCount++; diff --git a/quic/state/StateData.h b/quic/state/StateData.h index fe739d835..1e519fba1 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -745,6 +745,9 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction { // Number of QUIC unique crypto frame received with initial package. uint16_t uniqueInitialCryptoFramesReceived{0}; + + // In priming mode data is written here instead of on the network + std::vector> primingData_; }; std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index 8df7d5a29..18bfd6a2e 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -472,6 +472,10 @@ struct TransportSettings { // Randomly skip one in N sequence numbers when sending packets. uint16_t skipOneInNPacketSequenceNumber{kSkipOneInNPacketSequenceNumber}; + + // When set to true it creates a transport for the sole purpose of + // retrieving 0-RTT data to a given destination + bool isPriming{false}; }; } // namespace quic