1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-07-30 14:43:05 +03:00

Support transport isPriming

Summary:
When the transport is in priming mode it saves all packets instead of writing them on the wire, and feeds them to a callback for the caller to get the data.

Meta:
Priming mode is used to get all packets before 1-RTT cipher is available, in order for them to get replayed later.

Reviewed By: kvtsoy

Differential Revision: D71290230

fbshipit-source-id: 230650cb1e5901069dda4ef850c9c724bf33b6be
This commit is contained in:
Paul Farcasanu
2025-05-07 09:32:06 -07:00
committed by Facebook GitHub Bot
parent 469da60216
commit c6a141bf80
13 changed files with 163 additions and 2 deletions

View File

@ -114,6 +114,14 @@ class QuicSocketLite {
* onTransportReady(), signifies full crypto handshake finished. * onTransportReady(), signifies full crypto handshake finished.
*/ */
virtual void onFullHandshakeDone() noexcept {} virtual void onFullHandshakeDone() noexcept {}
/**
* Client only.
* Called when the transport is in priming mode and 0-RTT packets are
* available
*/
virtual void onPrimingDataAvailable(
std::vector<quic::BufPtr>&& /* data */) noexcept {}
}; };
/** /**

View File

@ -1321,6 +1321,10 @@ QuicTransportBaseLite::writeSocketData() {
if (result.hasError()) { if (result.hasError()) {
return result; return result;
} }
if (conn_->transportSettings.isPriming && conn_->primingData_.size() > 0) {
connSetupCallback_->onPrimingDataAvailable(
std::move(conn_->primingData_));
}
if (closeState_ != CloseState::CLOSED) { if (closeState_ != CloseState::CLOSED) {
if (conn_->pendingEvents.closeTransport == true) { if (conn_->pendingEvents.closeTransport == true) {
return folly::makeUnexpected(QuicError( return folly::makeUnexpected(QuicError(

View File

@ -327,6 +327,14 @@ continuousMemoryBuildScheduleEncrypt(
auto encodedBodySize = encodedSize - headerLen; auto encodedBodySize = encodedSize - headerLen;
// Include previous packets back. // Include previous packets back.
packetBuf->prepend(prevSize); packetBuf->prepend(prevSize);
if (connection.transportSettings.isPriming && packetBuf) {
packetBuf->coalesce();
connection.bufAccessor->release(
folly::IOBuf::create(packetBuf->capacity()));
connection.primingData_.emplace_back(std::move(packetBuf));
return DataPathResult::makeWriteResult(
true, std::move(result.value()), encodedSize, encodedBodySize);
}
connection.bufAccessor->release(std::move(packetBuf)); connection.bufAccessor->release(std::move(packetBuf));
if (encodedSize > connection.udpSendPacketLen) { if (encodedSize > connection.udpSendPacketLen) {
VLOG(3) << "Quic sending pkt larger than limit, encodedSize=" VLOG(3) << "Quic sending pkt larger than limit, encodedSize="
@ -424,6 +432,12 @@ iobufChainBasedBuildScheduleEncrypt(
VLOG(3) << "Quic sending pkt larger than limit, encodedSize=" << encodedSize VLOG(3) << "Quic sending pkt larger than limit, encodedSize=" << encodedSize
<< " encodedBodySize=" << encodedBodySize; << " encodedBodySize=" << encodedBodySize;
} }
if (connection.transportSettings.isPriming && packetBuf) {
packetBuf->coalesce();
connection.primingData_.emplace_back(std::move(packetBuf));
return DataPathResult::makeWriteResult(
true, std::move(result.value()), encodedSize, encodedBodySize);
}
auto writeResult = ioBufBatch.write(std::move(packetBuf), encodedSize); auto writeResult = ioBufBatch.write(std::move(packetBuf), encodedSize);
if (writeResult.hasError()) { if (writeResult.hasError()) {
return folly::makeUnexpected(writeResult.error()); return folly::makeUnexpected(writeResult.error());

View File

@ -95,6 +95,11 @@ class MockConnectionSetupCallback : public QuicSocket::ConnectionSetupCallback {
MOCK_METHOD((void), onTransportReady, (), (noexcept)); MOCK_METHOD((void), onTransportReady, (), (noexcept));
MOCK_METHOD((void), onFirstPeerPacketProcessed, (), (noexcept)); MOCK_METHOD((void), onFirstPeerPacketProcessed, (), (noexcept));
MOCK_METHOD((void), onFullHandshakeDone, (), (noexcept)); MOCK_METHOD((void), onFullHandshakeDone, (), (noexcept));
MOCK_METHOD(
(void),
onPrimingDataAvailable,
(std::vector<quic::BufPtr>&&),
(noexcept));
}; };
class MockConnectionCallback : public QuicSocket::ConnectionCallback { class MockConnectionCallback : public QuicSocket::ConnectionCallback {

View File

@ -1201,6 +1201,13 @@ QuicClientTransportLite::startCryptoHandshake() {
clientPtr->connSetupCallback_->onTransportReady(); clientPtr->connSetupCallback_->onTransportReady();
} }
}); });
} else if (clientConn_->transportSettings.isPriming) {
auto clientPtr = dynamic_cast<QuicClientTransportLite*>(self.get());
if (clientPtr->connSetupCallback_) {
clientPtr->connSetupCallback_->onConnectionSetupError(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
"Priming error: Zero-RTT not available"));
}
} }
return folly::unit; return folly::unit;

View File

@ -79,3 +79,19 @@ mvfst_cpp_test(
"//quic/common/udpsocket/test:QuicAsyncUDPSocketMock", "//quic/common/udpsocket/test:QuicAsyncUDPSocketMock",
], ],
) )
mvfst_cpp_test(
name = "QuicClientTransportLiteTest",
srcs = [
"QuicClientTransportLiteTest.cpp",
],
deps = [
"//quic/api/test:mocks",
"//quic/client:client",
"//quic/client/test:mocks",
"//quic/common/events:folly_eventbase",
"//quic/common/events/test:QuicEventBaseMock",
"//quic/common/test:test_utils",
"//quic/common/udpsocket/test:QuicAsyncUDPSocketMock",
],
)

View File

@ -11,6 +11,7 @@ quic_add_test(TARGET ClientStateMachineTest
SOURCES SOURCES
ClientStateMachineTest.cpp ClientStateMachineTest.cpp
QuicClientTransportTest.cpp QuicClientTransportTest.cpp
QuicClientTransportLiteTest.cpp
QuicConnectorTest.cpp QuicConnectorTest.cpp
DEPENDS DEPENDS
Folly::folly Folly::folly

View File

@ -0,0 +1,95 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <gtest/gtest.h>
#include <quic/api/test/Mocks.h>
#include <quic/client/QuicClientTransport.h>
#include <quic/client/test/Mocks.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <quic/common/events/test/QuicEventBaseMock.h>
#include <quic/common/test/TestUtils.h>
#include <quic/common/udpsocket/test/QuicAsyncUDPSocketMock.h>
using namespace ::testing;
namespace quic::test {
class QuicClientTransportLiteMock : public QuicClientTransportLite {
public:
QuicClientTransportLiteMock(
std::shared_ptr<quic::FollyQuicEventBase> evb,
std::unique_ptr<QuicAsyncUDPSocketMock> socket,
std::shared_ptr<MockClientHandshakeFactory> handshakeFactory)
: QuicTransportBaseLite(evb, std::move(socket)),
QuicClientTransportLite(evb, nullptr, handshakeFactory) {}
QuicClientConnectionState* getConn() {
return clientConn_;
}
};
class QuicClientTransportLiteTest : public Test {
public:
void SetUp() override {
qEvb_ = std::make_shared<FollyQuicEventBase>(&evb_);
auto socket = std::make_unique<QuicAsyncUDPSocketMock>();
sockPtr_ = socket.get();
ON_CALL(*socket, setAdditionalCmsgsFunc(_))
.WillByDefault(Return(folly::unit));
ON_CALL(*socket, close()).WillByDefault(Return(folly::unit));
ON_CALL(*socket, bind(_)).WillByDefault(Return(folly::unit));
ON_CALL(*socket, connect(_)).WillByDefault(Return(folly::unit));
ON_CALL(*socket, setReuseAddr(_)).WillByDefault(Return(folly::unit));
ON_CALL(*socket, setReusePort(_)).WillByDefault(Return(folly::unit));
ON_CALL(*socket, setRecvTos(_)).WillByDefault(Return(folly::unit));
ON_CALL(*socket, getRecvTos()).WillByDefault(Return(false));
ON_CALL(*socket, getGSO()).WillByDefault(Return(0));
ON_CALL(*socket, setCmsgs(_)).WillByDefault(Return(folly::unit));
ON_CALL(*socket, appendCmsgs(_)).WillByDefault(Return(folly::unit));
auto mockFactory = std::make_shared<MockClientHandshakeFactory>();
EXPECT_CALL(*mockFactory, _makeClientHandshake(_))
.WillRepeatedly(Invoke(
[&](QuicClientConnectionState* conn)
-> std::unique_ptr<quic::ClientHandshake> {
return std::make_unique<MockClientHandshake>(conn);
}));
quicClient_ = std::make_shared<QuicClientTransportLiteMock>(
qEvb_, std::move(socket), mockFactory);
quicClient_->getConn()->oneRttWriteCipher = test::createNoOpAead();
quicClient_->getConn()->oneRttWriteHeaderCipher =
test::createNoOpHeaderCipher();
ASSERT_FALSE(quicClient_->getState()
->streamManager->setMaxLocalBidirectionalStreams(128)
.hasError());
}
void TearDown() override {
EXPECT_CALL(*sockPtr_, close()).WillRepeatedly(Return(folly::unit));
quicClient_->closeNow(folly::none);
}
folly::EventBase evb_;
std::shared_ptr<FollyQuicEventBase> qEvb_;
std::shared_ptr<QuicClientTransportLiteMock> quicClient_;
MockConnectionSetupCallback mockConnectionSetupCallback_;
QuicAsyncUDPSocketMock* sockPtr_{nullptr};
};
TEST_F(QuicClientTransportLiteTest, TestPriming) {
auto transportSettings = quicClient_->getTransportSettings();
transportSettings.isPriming = true;
quicClient_->setTransportSettings(std::move(transportSettings));
quicClient_->setConnectionSetupCallback(&mockConnectionSetupCallback_);
quicClient_->getConn()->zeroRttWriteCipher = test::createNoOpAead();
StreamId streamId = quicClient_->createBidirectionalStream().value();
quicClient_->writeChain(streamId, folly::IOBuf::copyBuffer("test"), false);
EXPECT_CALL(mockConnectionSetupCallback_, onPrimingDataAvailable(_));
evb_.loopOnce(EVLOOP_NONBLOCK);
}
} // namespace quic::test

View File

@ -30,7 +30,7 @@ BufPtr& BufAccessor::buf() {
void BufAccessor::release(BufPtr buf) { void BufAccessor::release(BufPtr buf) {
CHECK(!buf_) << "Can't override existing buf"; CHECK(!buf_) << "Can't override existing buf";
CHECK(buf) << "Invalid BufPtr being released"; CHECK(buf) << "Invalid BufPtr being released";
CHECK_EQ(buf->capacity(), capacity_) CHECK_GE(buf->capacity(), capacity_)
<< "BufPtr has wrong capacity, capacit_=" << capacity_ << "BufPtr has wrong capacity, capacit_=" << capacity_
<< ", buf capacity=" << buf->capacity(); << ", buf capacity=" << buf->capacity();
CHECK(!buf->isChained()) << "Reject chained buf"; CHECK(!buf->isChained()) << "Reject chained buf";

View File

@ -26,7 +26,7 @@ TEST(BufAccessor, BasicAccess) {
TEST(BufAccessor, CapacityMatch) { TEST(BufAccessor, CapacityMatch) {
BufAccessor accessor(1000); BufAccessor accessor(1000);
auto buf = accessor.obtain(); auto buf = accessor.obtain();
buf = folly::IOBuf::create(2000); buf = folly::IOBuf::create(500);
EXPECT_DEATH(accessor.release(std::move(buf)), ""); EXPECT_DEATH(accessor.release(std::move(buf)), "");
} }

View File

@ -48,6 +48,10 @@ bool isPersistentCongestion(
folly::Expected<folly::Unit, QuicError> onPTOAlarm( folly::Expected<folly::Unit, QuicError> onPTOAlarm(
QuicConnectionStateBase& conn) { QuicConnectionStateBase& conn) {
VLOG(10) << __func__ << " " << conn; VLOG(10) << __func__ << " " << conn;
if (conn.transportSettings.isPriming) {
// No retransmits in Priming mode
return folly::unit;
}
QUIC_STATS(conn.statsCallback, onPTO); QUIC_STATS(conn.statsCallback, onPTO);
conn.lossState.ptoCount++; conn.lossState.ptoCount++;
conn.lossState.totalPTOCount++; conn.lossState.totalPTOCount++;

View File

@ -745,6 +745,9 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
// Number of QUIC unique crypto frame received with initial package. // Number of QUIC unique crypto frame received with initial package.
uint16_t uniqueInitialCryptoFramesReceived{0}; uint16_t uniqueInitialCryptoFramesReceived{0};
// In priming mode data is written here instead of on the network
std::vector<std::unique_ptr<folly::IOBuf>> primingData_;
}; };
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);

View File

@ -472,6 +472,10 @@ struct TransportSettings {
// Randomly skip one in N sequence numbers when sending packets. // Randomly skip one in N sequence numbers when sending packets.
uint16_t skipOneInNPacketSequenceNumber{kSkipOneInNPacketSequenceNumber}; uint16_t skipOneInNPacketSequenceNumber{kSkipOneInNPacketSequenceNumber};
// When set to true it creates a transport for the sole purpose of
// retrieving 0-RTT data to a given destination
bool isPriming{false};
}; };
} // namespace quic } // namespace quic