From 7f2cb16408f5623c0e5847a6c92c48b40c5276b0 Mon Sep 17 00:00:00 2001 From: Joseph Beshay Date: Thu, 9 May 2024 11:05:32 -0700 Subject: [PATCH] Refactor QuicClient and Server transports to process ReceivedUdpPackets with attached metadata Summary: ReceivedUdpPacket has attached metadata (currently timings and later in the stack, tos values). Rather than passing the metadata separately in the read path for the client and the server, this propagates the ReceivedUdpPacket further up so this metadata is easier to use in the rest of the stack. Reviewed By: mjoras Differential Revision: D54912162 fbshipit-source-id: c980d5b276704f5bba780ac16a380bbb4e5bedad --- quic/api/QuicTransportBase.cpp | 2 +- quic/api/test/QuicTransportBaseTest.cpp | 4 +- quic/client/QuicClientTransport.cpp | 48 ++++++++--------- quic/client/QuicClientTransport.h | 3 +- quic/common/BUCK | 2 +- quic/common/NetworkData.h | 18 ++++--- quic/common/test/TestUtils.cpp | 6 +++ quic/common/test/TestUtils.h | 3 ++ .../client/test/QuicClientTransportTestUtil.h | 8 ++- quic/server/QuicServerPacketRouter.cpp | 5 +- quic/server/QuicServerTransport.cpp | 4 +- quic/server/QuicServerWorker.cpp | 28 +++++----- quic/server/QuicServerWorker.h | 5 +- quic/server/state/ServerStateMachine.cpp | 11 ++-- quic/server/test/QuicServerTest.cpp | 54 +++++++++---------- 15 files changed, 108 insertions(+), 93 deletions(-) diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 6b5d9ecdd..2c48f78e3 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1876,7 +1876,7 @@ void QuicTransportBase::onNetworkData( SocketObserverInterface::PacketsReceivedEvent::ReceivedUdpPacket:: Builder() .setPacketReceiveTime(packet.timings.receiveTimePoint) - .setPacketNumBytes(packet.buf->computeChainDataLength()) + .setPacketNumBytes(packet.buf.chainLength()) .build()); } diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 757337e3f..043fd21ea 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -273,10 +273,10 @@ class TestQuicTransport void onReadData(const folly::SocketAddress&, ReceivedUdpPacket&& udpPacket) override { - if (!udpPacket.buf) { + if (udpPacket.buf.empty()) { return; } - folly::io::Cursor cursor(udpPacket.buf.get()); + folly::io::Cursor cursor(udpPacket.buf.front()); while (!cursor.isAtEnd()) { // create server chosen connId with processId = 0 and workerId = 0 ServerConnectionIdParams params(0, 0, 0); diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index 87c1d130b..4fb6e47aa 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -123,8 +123,7 @@ void QuicClientTransport::processUdpPacket( ReceivedUdpPacket&& udpPacket) { // Process the arriving UDP packet, which may have coalesced QUIC packets. { - BufQueue udpData; - udpData.append(std::move(udpPacket.buf)); + BufQueue& udpData = udpPacket.buf; if (!conn_->version) { // We only check for version negotiation packets before the version @@ -145,7 +144,7 @@ void QuicClientTransport::processUdpPacket( for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { - processUdpPacketData(peer, udpPacket.timings, udpData); + processUdpPacketData(peer, udpPacket); } VLOG_IF(4, !udpData.empty()) << "Leaving " << udpData.chainLength() @@ -156,23 +155,23 @@ void QuicClientTransport::processUdpPacket( // Process any deferred pending 1RTT and handshake packets if we have keys. if (conn_->readCodec->getOneRttReadCipher() && !clientConn_->pendingOneRttData.empty()) { - BufQueue pendingPacket; - for (auto& pendingData : clientConn_->pendingOneRttData) { - pendingPacket.append(std::move(pendingData.udpPacket.buf)); - processUdpPacketData( - pendingData.peer, pendingData.udpPacket.timings, pendingPacket); - pendingPacket.move(); + for (auto& pendingPacket : clientConn_->pendingOneRttData) { + // The first loop should try to process any leftover data in the incoming + // buffer. + pendingPacket.udpPacket.buf.append(udpPacket.buf.move()); + + processUdpPacketData(pendingPacket.peer, pendingPacket.udpPacket); } clientConn_->pendingOneRttData.clear(); } if (conn_->readCodec->getHandshakeReadCipher() && !clientConn_->pendingHandshakeData.empty()) { - BufQueue pendingPacket; - for (auto& pendingData : clientConn_->pendingHandshakeData) { - pendingPacket.append(std::move(pendingData.udpPacket.buf)); - processUdpPacketData( - pendingData.peer, pendingData.udpPacket.timings, pendingPacket); - pendingPacket.move(); + for (auto& pendingPacket : clientConn_->pendingHandshakeData) { + // The first loop should try to process any leftover data in the incoming + // buffer. + pendingPacket.udpPacket.buf.append(udpPacket.buf.move()); + + processUdpPacketData(pendingPacket.peer, pendingPacket.udpPacket); } clientConn_->pendingHandshakeData.clear(); } @@ -180,14 +179,13 @@ void QuicClientTransport::processUdpPacket( void QuicClientTransport::processUdpPacketData( const folly::SocketAddress& peer, - const ReceivedUdpPacket::Timings& udpPacketTimings, - BufQueue& udpPacketData) { - auto packetSize = udpPacketData.chainLength(); + ReceivedUdpPacket& udpPacket) { + auto packetSize = udpPacket.buf.chainLength(); if (packetSize == 0) { return; } auto parsedPacket = conn_->readCodec->parsePacket( - udpPacketData, conn_->ackStates, conn_->clientConnectionId->size()); + udpPacket.buf, conn_->ackStates, conn_->clientConnectionId->size()); StatelessReset* statelessReset = parsedPacket.statelessReset(); if (statelessReset) { const auto& token = clientConn_->statelessResetToken; @@ -270,7 +268,7 @@ void QuicClientTransport::processUdpPacketData( : clientConn_->pendingHandshakeData; pendingData.emplace_back( ReceivedUdpPacket( - std::move(cipherUnavailable->packet), udpPacketTimings), + std::move(cipherUnavailable->packet), udpPacket.timings), peer); if (conn_->qLogger) { conn_->qLogger->addPacketBuffered( @@ -375,7 +373,7 @@ void QuicClientTransport::processUdpPacketData( // Add the packet to the AckState associated with the packet number space. auto& ackState = getAckState(*conn_, pnSpace); uint64_t distanceFromExpectedPacketNum = - addPacketToAckState(*conn_, ackState, packetNum, udpPacketTimings); + addPacketToAckState(*conn_, ackState, packetNum, udpPacket.timings); if (distanceFromExpectedPacketNum > 0) { QUIC_STATS(conn_->statsCallback, onOutOfOrderPacketReceived); } @@ -468,7 +466,7 @@ void QuicClientTransport::processUdpPacketData( ackedPacketVisitor, ackedFrameVisitor, markPacketLoss, - udpPacketTimings.receiveTimePoint)); + udpPacket.timings.receiveTimePoint)); break; } case QuicFrame::Type::RstStreamFrame: { @@ -623,7 +621,7 @@ void QuicClientTransport::processUdpPacketData( // Datagram isn't retransmittable. But we would like to ack them early. // So, make Datagram frames count towards ack policy pktHasRetransmittableData = true; - handleDatagram(*conn_, frame, udpPacketTimings.receiveTimePoint); + handleDatagram(*conn_, frame, udpPacket.timings.receiveTimePoint); break; } case QuicFrame::Type::ImmediateAckFrame: { @@ -1460,10 +1458,10 @@ void QuicClientTransport::onNotifyDataAvailable( // track the received packets for (const auto& packet : networkData.getPackets()) { - if (!packet.buf) { + if (packet.buf.empty()) { continue; } - auto len = packet.buf->computeChainDataLength(); + auto len = packet.buf.chainLength(); maybeQlogDatagram(len); totalPackets++; totalPacketLen += len; diff --git a/quic/client/QuicClientTransport.h b/quic/client/QuicClientTransport.h index ad31a50dd..1cb78c52b 100644 --- a/quic/client/QuicClientTransport.h +++ b/quic/client/QuicClientTransport.h @@ -287,8 +287,7 @@ class QuicClientTransport */ void processUdpPacketData( const folly::SocketAddress& peer, - const ReceivedUdpPacket::Timings& udpPacketTimings, - BufQueue& udpPacketData); + ReceivedUdpPacket& udpPacket); void startCryptoHandshake(); diff --git a/quic/common/BUCK b/quic/common/BUCK index ba8b0d423..48026597a 100644 --- a/quic/common/BUCK +++ b/quic/common/BUCK @@ -134,9 +134,9 @@ mvfst_cpp_library( "NetworkData.h", ], exported_deps = [ + ":buf_util", ":time_points", "//folly:optional", - "//folly/io:iobuf", "//quic:constants", ], ) diff --git a/quic/common/NetworkData.h b/quic/common/NetworkData.h index 8a861d3d6..604ac4515 100644 --- a/quic/common/NetworkData.h +++ b/quic/common/NetworkData.h @@ -8,8 +8,8 @@ #pragma once #include -#include #include +#include #include #include @@ -59,7 +59,7 @@ struct ReceivedUdpPacket { ReceivedUdpPacket(Buf&& bufIn, Timings timingsIn) : buf(std::move(bufIn)), timings(std::move(timingsIn)) {} - Buf buf; + BufQueue buf; Timings timings; }; @@ -74,6 +74,12 @@ struct NetworkData { } } + explicit NetworkData(ReceivedUdpPacket&& udpPacket) + : receiveTimePoint_(udpPacket.timings.receiveTimePoint) { + totalData_ = udpPacket.buf.chainLength(); + packets_.push_back(std::move(udpPacket)); + } + NetworkData( std::vector&& packetBufs, const TimePoint& receiveTimePointIn) @@ -92,7 +98,7 @@ struct NetworkData { totalData_([this]() { size_t result = 0; for (const auto& packet : packets_) { - result += packet.buf->computeChainDataLength(); + result += packet.buf.chainLength(); } return result; }()) {} @@ -104,7 +110,7 @@ struct NetworkData { void addPacket(ReceivedUdpPacket&& packetIn) { packets_.emplace_back(std::move(packetIn)); packets_.back().timings.receiveTimePoint = receiveTimePoint_; - totalData_ += packets_.back().buf->computeChainDataLength(); + totalData_ += packets_.back().buf.chainLength(); } [[nodiscard]] const std::vector& getPackets() const { @@ -134,9 +140,9 @@ struct NetworkData { std::unique_ptr buf; for (auto& packet : packets_) { if (buf) { - buf->prependChain(std::move(packet.buf)); + buf->prependChain(packet.buf.move()); } else { - buf = std::move(packet.buf); + buf = packet.buf.move(); } } return buf; diff --git a/quic/common/test/TestUtils.cpp b/quic/common/test/TestUtils.cpp index 996ec5ec1..b477208d8 100644 --- a/quic/common/test/TestUtils.cpp +++ b/quic/common/test/TestUtils.cpp @@ -399,6 +399,12 @@ Buf packetToBuf(const RegularQuicPacketBuilder::Packet& packet) { return packetBuf; } +ReceivedUdpPacket packetToReceivedUdpPacket( + const RegularQuicPacketBuilder::Packet& writePacket) { + ReceivedUdpPacket packet(packetToBuf(writePacket)); + return packet; +} + Buf packetToBufCleartext( RegularQuicPacketBuilder::Packet& packet, const Aead& cleartextCipher, diff --git a/quic/common/test/TestUtils.h b/quic/common/test/TestUtils.h index 3c8dfdd38..acf71b2f6 100644 --- a/quic/common/test/TestUtils.h +++ b/quic/common/test/TestUtils.h @@ -117,6 +117,9 @@ RegularQuicPacketBuilder::Packet createCryptoPacket( Buf packetToBuf(const RegularQuicPacketBuilder::Packet& packet); +ReceivedUdpPacket packetToReceivedUdpPacket( + const RegularQuicPacketBuilder::Packet& packetIn); + Buf packetToBufCleartext( RegularQuicPacketBuilder::Packet& packet, const Aead& cleartextCipher, diff --git a/quic/fizz/client/test/QuicClientTransportTestUtil.h b/quic/fizz/client/test/QuicClientTransportTestUtil.h index bb68bc1b8..42b3001c5 100644 --- a/quic/fizz/client/test/QuicClientTransportTestUtil.h +++ b/quic/fizz/client/test/QuicClientTransportTestUtil.h @@ -737,7 +737,9 @@ class QuicClientTransportTestBase : public virtual testing::Test { folly::SocketAddress* peer = nullptr) { for (const auto& packet : data.getPackets()) { deliverDataWithoutErrorCheck( - peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes); + peer == nullptr ? serverAddr : *peer, + packet.buf.clone()->coalesce(), + writes); } } @@ -773,7 +775,9 @@ class QuicClientTransportTestBase : public virtual testing::Test { folly::SocketAddress* peer = nullptr) { for (const auto& packet : data.getPackets()) { deliverData( - peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes); + peer == nullptr ? serverAddr : *peer, + packet.buf.clone()->coalesce(), + writes); } } diff --git a/quic/server/QuicServerPacketRouter.cpp b/quic/server/QuicServerPacketRouter.cpp index a25a91ae1..84bef2c1f 100644 --- a/quic/server/QuicServerPacketRouter.cpp +++ b/quic/server/QuicServerPacketRouter.cpp @@ -242,10 +242,11 @@ void TakeoverPacketHandler::processForwardedPacket( TimePoint clientPacketReceiveTime(tick); data->trimStart(cursor - data.get()); QUIC_STATS(worker_->getStatsCallback(), onForwardedPacketProcessed); + ReceivedUdpPacket packet(std::move(data)); + packet.timings.receiveTimePoint = clientPacketReceiveTime; worker_->handleNetworkData( peerAddress, - std::move(data), - clientPacketReceiveTime, + packet, /* isForwardedData */ true); } diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index df0407466..8a4a09010 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -519,9 +519,7 @@ void QuicServerTransport::processPendingData(bool async) { for (auto& pendingPacket : *pendingData) { serverPtr->onNetworkData( pendingPacket.peer, - NetworkData( - std::move(pendingPacket.udpPacket.buf), - pendingPacket.udpPacket.timings.receiveTimePoint)); + NetworkData(std::move(pendingPacket.udpPacket))); if (serverPtr->closeState_ == CloseState::CLOSED) { // The pending data could potentially contain a connection close, or // the app could have triggered a connection close with an error. It diff --git a/quic/server/QuicServerWorker.cpp b/quic/server/QuicServerWorker.cpp index 34be7f14a..4a4caf762 100644 --- a/quic/server/QuicServerWorker.cpp +++ b/quic/server/QuicServerWorker.cpp @@ -344,7 +344,9 @@ void QuicServerWorker::onDataAvailable( data->append(len); QUIC_STATS(statsCallback_, onPacketReceived); QUIC_STATS(statsCallback_, onRead, len); - handleNetworkData(client, std::move(data), packetReceiveTime); + ReceivedUdpPacket udpPacket(std::move(data)); + udpPacket.timings.receiveTimePoint = packetReceiveTime; + handleNetworkData(client, udpPacket); } else { // if we receive a truncated packet // we still need to consider the prev valid ones @@ -353,7 +355,6 @@ void QuicServerWorker::onDataAvailable( if (truncated) { len -= len % params.gro; } - data->append(len); QUIC_STATS(statsCallback_, onPacketReceived); QUIC_STATS(statsCallback_, onRead, len); @@ -366,7 +367,9 @@ void QuicServerWorker::onDataAvailable( // start at offset, use all the remaining data data->trimStart(offset); DCHECK_EQ(data->length(), remaining); - handleNetworkData(client, std::move(data), packetReceiveTime); + ReceivedUdpPacket udpPacket(std::move(data)); + udpPacket.timings.receiveTimePoint = packetReceiveTime; + handleNetworkData(client, udpPacket); break; } auto tmp = data->cloneOne(); @@ -378,15 +381,16 @@ void QuicServerWorker::onDataAvailable( DCHECK_EQ(tmp->length(), params.gro); offset += params.gro; remaining -= params.gro; - handleNetworkData(client, std::move(tmp), packetReceiveTime); + ReceivedUdpPacket udpPacket(std::move(tmp)); + udpPacket.timings.receiveTimePoint = packetReceiveTime; + handleNetworkData(client, udpPacket); } } } void QuicServerWorker::handleNetworkData( const folly::SocketAddress& client, - Buf data, - const TimePoint& packetReceiveTime, + ReceivedUdpPacket& udpPacket, bool isForwardedData) noexcept { // if packet drop reason is set, invoke stats cb accordingly auto packetDropReason = PacketDropReason::NONE; @@ -398,7 +402,7 @@ void QuicServerWorker::handleNetworkData( try { // check error conditions for packet drop & early return - folly::io::Cursor cursor(data.get()); + folly::io::Cursor cursor(udpPacket.buf.front()); if (shutdown_) { VLOG(4) << "Packet received after shutdown, dropping"; packetDropReason = PacketDropReason::SERVER_SHUTDOWN; @@ -433,7 +437,7 @@ void QuicServerWorker::handleNetworkData( return forwardNetworkData( client, std::move(routingData), - NetworkData(std::move(data), packetReceiveTime), + NetworkData(std::move(udpPacket)), folly::none, /* quicVersion */ isForwardedData); } @@ -452,7 +456,7 @@ void QuicServerWorker::handleNetworkData( } if (maybeSendVersionNegotiationPacketOrDrop( - client, isInitial, invariant, data->computeChainDataLength())) { + client, isInitial, invariant, udpPacket.buf.chainLength())) { return; } @@ -473,12 +477,12 @@ void QuicServerWorker::handleNetworkData( return forwardNetworkData( client, std::move(routingData), - NetworkData(std::move(data), packetReceiveTime), + NetworkData(std::move(udpPacket)), invariant.version, isForwardedData); } - if (!tryHandlingAsHealthCheck(client, *data)) { + if (!tryHandlingAsHealthCheck(client, *udpPacket.buf.front())) { VLOG(6) << "Failed to parse long header"; packetDropReason = PacketDropReason::PARSE_ERROR_LONG_HEADER; } @@ -900,7 +904,7 @@ void QuicServerWorker::dispatchPacketData( // If there is a token present, decrypt it (could be either a retry // token or a new token) - folly::io::Cursor cursor(networkData.getPackets().front().buf.get()); + folly::io::Cursor cursor(networkData.getPackets().front().buf.front()); auto maybeEncryptedToken = maybeGetEncryptedToken(cursor); bool hasTokenSecret = transportSettings_.retryTokenSecret.hasValue(); diff --git a/quic/server/QuicServerWorker.h b/quic/server/QuicServerWorker.h index f8b88a7e5..d33baed1d 100644 --- a/quic/server/QuicServerWorker.h +++ b/quic/server/QuicServerWorker.h @@ -452,12 +452,11 @@ class QuicServerWorker : public FollyAsyncUDPSocketAlias::ReadCallback, FollyAsyncUDPSocketAlias::ReadCallback* getTakeoverHandlerCallback() { return takeoverCB_.get(); } - + // Handle the network data for a udp packet // public so that it can be called by tests as well. void handleNetworkData( const folly::SocketAddress& client, - Buf data, - const TimePoint& receiveTime, + ReceivedUdpPacket& packet, bool isForwardedData = false) noexcept; /** diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index de24638d6..92c3502ec 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -731,15 +731,14 @@ void onServerReadDataFromOpen( ServerEvents::ReadData& readData) { CHECK_EQ(conn.state, ServerState::Open); // Don't bother parsing if the data is empty. - if (!readData.udpPacket.buf || - readData.udpPacket.buf->computeChainDataLength() == 0) { + if (readData.udpPacket.buf.empty()) { return; } bool firstPacketFromPeer = false; if (!conn.readCodec) { firstPacketFromPeer = true; - folly::io::Cursor cursor(readData.udpPacket.buf.get()); + folly::io::Cursor cursor(readData.udpPacket.buf.front()); auto initialByte = cursor.readBE(); auto parsedLongHeader = parseLongHeaderInvariant(initialByte, cursor); if (!parsedLongHeader) { @@ -848,8 +847,7 @@ void onServerReadDataFromOpen( initialDestinationConnectionId, version); conn.peerAddress = conn.originalPeerAddress; } - BufQueue udpData; - udpData.append(std::move(readData.udpPacket.buf)); + BufQueue& udpData = readData.udpPacket.buf; uint64_t processedPacketsTotal = 0; for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; @@ -1385,8 +1383,7 @@ void onServerReadDataFromClosed( QuicServerConnectionState& conn, ServerEvents::ReadData& readData) { CHECK_EQ(conn.state, ServerState::Closed); - BufQueue udpData; - udpData.append(std::move(readData.udpPacket.buf)); + BufQueue& udpData = readData.udpPacket.buf; auto packetSize = udpData.empty() ? 0 : udpData.chainLength(); if (!conn.readCodec) { // drop data. We closed before we even got the first packet. This is diff --git a/quic/server/test/QuicServerTest.cpp b/quic/server/test/QuicServerTest.cpp index 9c7c484b5..33cc868dd 100644 --- a/quic/server/test/QuicServerTest.cpp +++ b/quic/server/test/QuicServerTest.cpp @@ -49,7 +49,7 @@ namespace test { MATCHER_P(NetworkDataMatches, networkData, "") { for (size_t i = 0; i < arg.getPackets().size(); ++i) { folly::IOBufEqualTo eq; - bool equals = eq(*arg.getPackets()[i].buf, networkData); + bool equals = eq(*arg.getPackets()[i].buf.front(), networkData); if (equals) { return true; } @@ -988,8 +988,8 @@ TEST_F(QuicServerWorkerTest, BlockedSourcePort) { while (builder.remainingSpaceInPkt() > 0) { writeFrame(PaddingFrame(), builder); } - auto packet = packetToBuf(std::move(builder).buildPacket()); - worker_->handleNetworkData(blockedSrcPort, std::move(packet), Clock::now()); + auto packet = packetToReceivedUdpPacket(std::move(builder).buildPacket()); + worker_->handleNetworkData(blockedSrcPort, packet); eventbase_.loopIgnoreKeepAlive(); } @@ -1007,8 +1007,8 @@ TEST_F(QuicServerWorkerTest, ZeroLengthConnectionId) { while (builder.remainingSpaceInPkt() > 0) { writeFrame(PaddingFrame(), builder); } - auto packet = packetToBuf(std::move(builder).buildPacket()); - worker_->handleNetworkData(kClientAddr, std::move(packet), Clock::now()); + auto packet = packetToReceivedUdpPacket(std::move(builder).buildPacket()); + worker_->handleNetworkData(kClientAddr, packet); eventbase_.loopIgnoreKeepAlive(); } @@ -1022,11 +1022,11 @@ TEST_F(QuicServerWorkerTest, ClientInitialCounting) { RegularQuicPacketBuilder initialBuilder( kDefaultUDPSendPacketLen, std::move(initialHeader), 0); initialBuilder.encodePacketHeader(); - auto initialPacket = packetToBuf(std::move(initialBuilder).buildPacket()); + auto initialPacket = + packetToReceivedUdpPacket((std::move(initialBuilder).buildPacket())); EXPECT_CALL(*quicStats_, onClientInitialReceived(QuicVersion::MVFST)) .Times(1); - worker_->handleNetworkData( - kClientAddr, std::move(initialPacket), Clock::now()); + worker_->handleNetworkData(kClientAddr, initialPacket); eventbase_.loopIgnoreKeepAlive(); // Initial with any packet number should also increate the counting @@ -1036,12 +1036,11 @@ TEST_F(QuicServerWorkerTest, ClientInitialCounting) { RegularQuicPacketBuilder initialBuilderBigNum( kDefaultUDPSendPacketLen, std::move(initialHeaderBigNum), 0); initialBuilderBigNum.encodePacketHeader(); - auto initialPacketBigNum = - packetToBuf(std::move(initialBuilderBigNum).buildPacket()); + auto initialPacketBigNum = packetToReceivedUdpPacket( + (std::move(initialBuilderBigNum).buildPacket())); EXPECT_CALL(*quicStats_, onClientInitialReceived(QuicVersion::MVFST)) .Times(1); - worker_->handleNetworkData( - kClientAddr, std::move(initialPacketBigNum), Clock::now()); + worker_->handleNetworkData(kClientAddr, initialPacketBigNum); eventbase_.loopIgnoreKeepAlive(); LongHeader handshakeHeader( @@ -1049,10 +1048,10 @@ TEST_F(QuicServerWorkerTest, ClientInitialCounting) { RegularQuicPacketBuilder handshakeBuilder( kDefaultUDPSendPacketLen, std::move(handshakeHeader), 0); handshakeBuilder.encodePacketHeader(); - auto handshakePacket = packetToBuf(std::move(handshakeBuilder).buildPacket()); + auto handshakePacket = + packetToReceivedUdpPacket((std::move(handshakeBuilder).buildPacket())); EXPECT_CALL(*quicStats_, onClientInitialReceived(_)).Times(0); - worker_->handleNetworkData( - kClientAddr, std::move(handshakePacket), Clock::now()); + worker_->handleNetworkData(kClientAddr, handshakePacket); eventbase_.loopIgnoreKeepAlive(); } @@ -1073,8 +1072,8 @@ TEST_F(QuicServerWorkerTest, ConnectionIdTooShort) { while (builder.remainingSpaceInPkt() > 0) { writeFrame(PaddingFrame(), builder); } - auto packet = packetToBuf(std::move(builder).buildPacket()); - worker_->handleNetworkData(kClientAddr, std::move(packet), Clock::now()); + auto packet = packetToReceivedUdpPacket((std::move(builder).buildPacket())); + worker_->handleNetworkData(kClientAddr, packet); eventbase_.loopIgnoreKeepAlive(); } @@ -1203,8 +1202,8 @@ TEST_F(QuicServerWorkerTest, PacketAfterShutdown) { RegularQuicPacketBuilder builder( kDefaultUDPSendPacketLen, std::move(header), 0 /* largestAcked */); builder.encodePacketHeader(); - auto packet = packetToBuf(std::move(builder).buildPacket()); - worker_->handleNetworkData(kClientAddr, std::move(packet), Clock::now()); + auto packet = packetToReceivedUdpPacket((std::move(builder).buildPacket())); + worker_->handleNetworkData(kClientAddr, packet); eventbase_.terminateLoopSoon(); t.join(); } @@ -1984,7 +1983,7 @@ TEST_F(QuicServerWorkerTakeoverTest, QuicServerTakeoverProcessForwardedPkt) { // the original data should be extracted after processing takeover // protocol related information EXPECT_EQ(networkData->getPackets().size(), 1); - EXPECT_TRUE(eq(*data, *(networkData->getPackets()[0].buf))); + EXPECT_TRUE(eq(*data, *(networkData->getPackets()[0].buf.front()))); EXPECT_TRUE(isForwardedData); }; EXPECT_CALL(*takeoverWorkerCb_, routeDataToWorkerLong(_, _, _, _, _)) @@ -2182,7 +2181,7 @@ class QuicServerTest : public Test { auto, const auto& networkData) mutable { EXPECT_GT(networkData.getPackets().size(), 0); EXPECT_TRUE(folly::IOBufEqualTo()( - *networkData.getPackets()[0].buf, *expected)); + *networkData.getPackets()[0].buf.front(), *expected)); std::unique_lock lg(m); calledOnNetworkData = true; cv.notify_one(); @@ -2349,7 +2348,7 @@ TEST_F(QuicServerTest, RouteDataFromDifferentThread) { .WillOnce(Invoke([&](auto, const auto& networkData) { EXPECT_GT(networkData.getPackets().size(), 0); EXPECT_TRUE(folly::IOBufEqualTo()( - *networkData.getPackets()[0].buf, *initialData)); + *networkData.getPackets()[0].buf.front(), *initialData)); })); static_cast(server_.get()) @@ -2454,7 +2453,7 @@ class QuicServerTakeoverTest : public Test { [&, expected = data.get()](auto, const auto& networkData) { EXPECT_GT(networkData.getPackets().size(), 0); EXPECT_TRUE(folly::IOBufEqualTo()( - *networkData.getPackets()[0].buf, *expected)); + *networkData.getPackets()[0].buf.front(), *expected)); baton.post(); })); return transport; @@ -2562,7 +2561,7 @@ class QuicServerTakeoverTest : public Test { Invoke([&, expected = data.get()](auto, const auto& networkData) { EXPECT_GT(networkData.getPackets().size(), 0); EXPECT_TRUE(folly::IOBufEqualTo()( - *networkData.getPackets()[0].buf, *expected)); + *networkData.getPackets()[0].buf.front(), *expected)); b1.post(); })); // new quic server receives the packet and forwards it @@ -3029,7 +3028,7 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) { [&, expected = data.get()](auto, const auto& networkData) { EXPECT_GT(networkData.getPackets().size(), 0); EXPECT_TRUE(folly::IOBufEqualTo()( - *networkData.getPackets()[0].buf, *expected)); + *networkData.getPackets()[0].buf.front(), *expected)); b.post(); })); return transport; @@ -3072,7 +3071,8 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) { const NetworkData& networkData) noexcept { EXPECT_GT(networkData.getPackets().size(), 0); EXPECT_EQ(peer, reader->getSocket().address()); - EXPECT_TRUE(folly::IOBufEqualTo()(*data, *networkData.getPackets()[0].buf)); + EXPECT_TRUE( + folly::IOBufEqualTo()(*data, *networkData.getPackets()[0].buf.front())); b1.post(); }; EXPECT_CALL(*transport, onNetworkData(_, _)).WillOnce(Invoke(verifyZeroRtt)); @@ -3126,7 +3126,7 @@ TEST_F(QuicServerTest, ZeroRttBeforeInitial) { .Times(2) .WillRepeatedly(Invoke([&](auto, auto& networkData) { for (const auto& packet : networkData.getPackets()) { - receivedData.emplace_back(packet.buf->clone()); + receivedData.emplace_back(packet.buf.clone()); } if (receivedData.size() == 2) { b.post();