From 04facac67d6f5cf69b8d35468b7b9569dda2946d Mon Sep 17 00:00:00 2001 From: Brandon Schlinker Date: Sun, 5 Nov 2023 19:58:46 -0800 Subject: [PATCH] Cleanup and modularize receive path, improve timestamp support [15/x] Summary: This diff drops `NetworkDataSingle` in favor of `ReceivedPacket`. The latter contains a `ReceivedPacket::Timings` field that has the same `receiveTimePoint` currently in `NetworkDataSingle`, while also providing other useful signals. -- This diff is part of a larger stack focused on the following: - **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance: - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests. - The client currently has three receive paths, and none of them are well tested. - **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers. - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples). - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives. - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer. - **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps Reviewed By: silver23arrow Differential Revision: D48739219 fbshipit-source-id: fc2cdb7b425d68c729dd3bec00b6c6ff3c4bf8ec --- quic/api/QuicTransportBase.cpp | 2 +- quic/api/QuicTransportBase.h | 2 +- quic/api/test/QuicTransportBaseTest.cpp | 8 +++---- quic/api/test/TestQuicTransport.h | 4 ++-- quic/client/QuicClientTransport.cpp | 27 +++++++++--------------- quic/client/QuicClientTransport.h | 7 +++--- quic/client/state/ClientStateMachine.h | 8 +++---- quic/common/NetworkData.h | 25 ---------------------- quic/server/QuicServerTransport.cpp | 8 +++---- quic/server/QuicServerTransport.h | 5 ++--- quic/server/state/ServerStateMachine.cpp | 26 +++++++++-------------- quic/server/state/ServerStateMachine.h | 2 +- 12 files changed, 41 insertions(+), 83 deletions(-) diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 8a945b767..590797b56 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1905,7 +1905,7 @@ void QuicTransportBase::onNetworkData( auto packets = std::move(networkData).movePackets(); for (auto& packet : packets) { - onReadData(peer, NetworkDataSingle(std::move(packet))); + onReadData(peer, std::move(packet)); if (conn_->peerConnectionError) { closeImpl(QuicError( QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed")); diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 862b62342..a0ee7c451 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -309,7 +309,7 @@ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver { */ virtual void onReadData( const folly::SocketAddress& peer, - NetworkDataSingle&& networkData) = 0; + ReceivedPacket&& udpPacket) = 0; /** * Invoked when we have to write some data to the wire. diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 23c37b045..8f7ec954b 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -263,12 +263,12 @@ class TestQuicTransport return lossTimeout_.getTimeRemaining(); } - void onReadData(const folly::SocketAddress&, NetworkDataSingle&& data) + void onReadData(const folly::SocketAddress&, ReceivedPacket&& udpPacket) override { - if (!data.packet.buf) { + if (!udpPacket.buf) { return; } - folly::io::Cursor cursor(data.packet.buf.get()); + folly::io::Cursor cursor(udpPacket.buf.get()); while (!cursor.isAtEnd()) { // create server chosen connId with processId = 0 and workerId = 0 ServerConnectionIdParams params(0, 0, 0); @@ -290,7 +290,7 @@ class TestQuicTransport } else if (type == TestFrameType::DATAGRAM) { auto buffer = decodeDatagramFrame(cursor); auto frame = DatagramFrame(buffer.second, std::move(buffer.first)); - handleDatagram(*conn_, frame, data.packet.timings.receiveTimePoint); + handleDatagram(*conn_, frame, udpPacket.timings.receiveTimePoint); } else if (type == TestFrameType::STREAM_GROUP) { auto res = decodeStreamGroupBuffer(cursor); QuicStreamState* stream = diff --git a/quic/api/test/TestQuicTransport.h b/quic/api/test/TestQuicTransport.h index fa7c60935..82c265ca7 100644 --- a/quic/api/test/TestQuicTransport.h +++ b/quic/api/test/TestQuicTransport.h @@ -69,8 +69,8 @@ class TestQuicTransport } void onReadData( - const folly::SocketAddress& /*peer*/, - NetworkDataSingle&& /*networkData*/) noexcept override {} + const folly::SocketAddress& /* peer */, + ReceivedPacket&& /* udpPacket */) noexcept override {} void writeData() override { if (closed) { diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index c6f68d55b..9c0b0d9ce 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -120,9 +120,9 @@ QuicClientTransport::~QuicClientTransport() { void QuicClientTransport::processUdpPacket( const folly::SocketAddress& peer, - NetworkDataSingle&& networkData) { + ReceivedPacket&& udpPacket) { BufQueue udpData; - udpData.append(std::move(networkData.packet.buf)); + udpData.append(std::move(udpPacket.buf)); if (!conn_->version) { // We only check for version negotiation packets before the version @@ -143,7 +143,7 @@ void QuicClientTransport::processUdpPacket( for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { - processUdpPacketData(peer, networkData.packet.timings, udpData); + processUdpPacketData(peer, udpPacket.timings, udpData); } VLOG_IF(4, !udpData.empty()) << "Leaving " << udpData.chainLength() @@ -155,11 +155,9 @@ void QuicClientTransport::processUdpPacket( !clientConn_->pendingOneRttData.empty()) { BufQueue pendingPacket; for (auto& pendingData : clientConn_->pendingOneRttData) { - pendingPacket.append(std::move(pendingData.networkData.packet.buf)); + pendingPacket.append(std::move(pendingData.udpPacket.buf)); processUdpPacketData( - pendingData.peer, - pendingData.networkData.packet.timings, - pendingPacket); + pendingData.peer, pendingData.udpPacket.timings, pendingPacket); pendingPacket.move(); } clientConn_->pendingOneRttData.clear(); @@ -168,11 +166,9 @@ void QuicClientTransport::processUdpPacket( !clientConn_->pendingHandshakeData.empty()) { BufQueue pendingPacket; for (auto& pendingData : clientConn_->pendingHandshakeData) { - pendingPacket.append(std::move(pendingData.networkData.packet.buf)); + pendingPacket.append(std::move(pendingData.udpPacket.buf)); processUdpPacketData( - pendingData.peer, - pendingData.networkData.packet.timings, - pendingPacket); + pendingData.peer, pendingData.udpPacket.timings, pendingPacket); pendingPacket.move(); } clientConn_->pendingHandshakeData.clear(); @@ -269,10 +265,7 @@ void QuicClientTransport::processUdpPacketData( ? clientConn_->pendingOneRttData : clientConn_->pendingHandshakeData; pendingData.emplace_back( - NetworkDataSingle( - ReceivedPacket( - std::move(cipherUnavailable->packet), udpPacketTimings), - udpPacketTimings.receiveTimePoint), + ReceivedPacket(std::move(cipherUnavailable->packet), udpPacketTimings), peer); if (conn_->qLogger) { conn_->qLogger->addPacketBuffered( @@ -806,7 +799,7 @@ void QuicClientTransport::processUdpPacketData( void QuicClientTransport::onReadData( const folly::SocketAddress& peer, - NetworkDataSingle&& networkData) { + ReceivedPacket&& udpPacket) { if (closeState_ == CloseState::CLOSED) { // If we are closed, then we shouldn't process new network data. QUIC_STATS( @@ -817,7 +810,7 @@ void QuicClientTransport::onReadData( return; } bool waitingForFirstPacket = !hasReceivedPackets(*conn_); - processUdpPacket(peer, std::move(networkData)); + processUdpPacket(peer, std::move(udpPacket)); if (connSetupCallback_ && waitingForFirstPacket && hasReceivedPackets(*conn_)) { connSetupCallback_->onFirstPeerPacketProcessed(); diff --git a/quic/client/QuicClientTransport.h b/quic/client/QuicClientTransport.h index 56839ff61..3849ac3a2 100644 --- a/quic/client/QuicClientTransport.h +++ b/quic/client/QuicClientTransport.h @@ -122,9 +122,8 @@ class QuicClientTransport } // From QuicTransportBase - void onReadData( - const folly::SocketAddress& peer, - NetworkDataSingle&& networkData) override; + void onReadData(const folly::SocketAddress& peer, ReceivedPacket&& udpPacket) + override; void writeData() override; void closeTransport() override; void unbindConnection() override; @@ -247,7 +246,7 @@ class QuicClientTransport */ void processUdpPacket( const folly::SocketAddress& peer, - NetworkDataSingle&& networkData); + ReceivedPacket&& udpPacket); /** * Process data within a single UDP packet. diff --git a/quic/client/state/ClientStateMachine.h b/quic/client/state/ClientStateMachine.h index 0dd1db3dd..ca39f5b1b 100644 --- a/quic/client/state/ClientStateMachine.h +++ b/quic/client/state/ClientStateMachine.h @@ -22,13 +22,11 @@ namespace quic { struct CachedServerTransportParameters; struct PendingClientData { - NetworkDataSingle networkData; + ReceivedPacket udpPacket; folly::SocketAddress peer; - PendingClientData( - NetworkDataSingle networkDataIn, - folly::SocketAddress peerIn) - : networkData(std::move(networkDataIn)), peer(std::move(peerIn)) {} + PendingClientData(ReceivedPacket udpPacketIn, folly::SocketAddress peerIn) + : udpPacket(std::move(udpPacketIn)), peer(std::move(peerIn)) {} }; struct QuicClientConnectionState : public QuicConnectionStateBase { diff --git a/quic/common/NetworkData.h b/quic/common/NetworkData.h index 253ac8646..e796dffc2 100644 --- a/quic/common/NetworkData.h +++ b/quic/common/NetworkData.h @@ -125,29 +125,4 @@ struct NetworkData { size_t totalData_{0}; }; -struct NetworkDataSingle { - ReceivedPacket packet; - size_t totalData{0}; - - NetworkDataSingle() = default; - - explicit NetworkDataSingle(ReceivedPacket&& packetIn) - : packet(std::move(packetIn)) { - if (packet.buf) { - totalData += packet.buf->computeChainDataLength(); - } - } - - // TODO(bschlinker): Deprecate - NetworkDataSingle( - ReceivedPacket&& packetIn, - const TimePoint& receiveTimePointIn) - : packet(std::move(packetIn)) { - packet.timings.receiveTimePoint = receiveTimePointIn; - if (packet.buf) { - totalData += packet.buf->computeChainDataLength(); - } - } -}; - } // namespace quic diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 29597e09c..70aee1a50 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -144,10 +144,10 @@ void QuicServerTransport::setServerConnectionIdRejector( void QuicServerTransport::onReadData( const folly::SocketAddress& peer, - NetworkDataSingle&& networkData) { + ReceivedPacket&& udpPacket) { ServerEvents::ReadData readData; readData.peer = peer; - readData.networkData = std::move(networkData); + readData.udpPacket = std::move(udpPacket); bool waitingForFirstPacket = !hasReceivedPackets(*conn_); uint64_t prevWritableBytes = serverConn_->writableBytesLimit ? *serverConn_->writableBytesLimit @@ -512,8 +512,8 @@ void QuicServerTransport::processPendingData(bool async) { serverPtr->onNetworkData( pendingPacket.peer, NetworkData( - std::move(pendingPacket.networkData.packet.buf), - pendingPacket.networkData.packet.timings.receiveTimePoint)); + std::move(pendingPacket.udpPacket.buf), + pendingPacket.udpPacket.timings.receiveTimePoint)); if (serverPtr->closeState_ == CloseState::CLOSED) { // The pending data could potentially contain a connection close, or // the app could have triggered a connection close with an error. It diff --git a/quic/server/QuicServerTransport.h b/quic/server/QuicServerTransport.h index 3c4975e43..20ba46de7 100644 --- a/quic/server/QuicServerTransport.h +++ b/quic/server/QuicServerTransport.h @@ -133,9 +133,8 @@ class QuicServerTransport void verifiedClientAddress(); // From QuicTransportBase - void onReadData( - const folly::SocketAddress& peer, - NetworkDataSingle&& networkData) override; + void onReadData(const folly::SocketAddress& peer, ReceivedPacket&& udpPacket) + override; void writeData() override; void closeTransport() override; void unbindConnection() override; diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index 03fac4b91..09bbfbef5 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -713,9 +713,8 @@ static void handleCipherUnavailable( PacketDropReason::PARSE_ERROR_PACKET_BUFFERED); ServerEvents::ReadData pendingReadData; pendingReadData.peer = readData.peer; - pendingReadData.networkData = NetworkDataSingle( - ReceivedPacket(std::move(originalData->packet)), - readData.networkData.packet.timings.receiveTimePoint); + pendingReadData.udpPacket = ReceivedPacket( + std::move(originalData->packet), readData.udpPacket.timings); pendingData->emplace_back(std::move(pendingReadData)); VLOG(10) << "Adding pending data to " << toString(originalData->protectionType) @@ -739,15 +738,15 @@ void onServerReadDataFromOpen( ServerEvents::ReadData& readData) { CHECK_EQ(conn.state, ServerState::Open); // Don't bother parsing if the data is empty. - if (!readData.networkData.packet.buf || - readData.networkData.packet.buf->computeChainDataLength() == 0) { + if (!readData.udpPacket.buf || + readData.udpPacket.buf->computeChainDataLength() == 0) { return; } bool firstPacketFromPeer = false; if (!conn.readCodec) { firstPacketFromPeer = true; - folly::io::Cursor cursor(readData.networkData.packet.buf.get()); + folly::io::Cursor cursor(readData.udpPacket.buf.get()); auto initialByte = cursor.readBE(); auto parsedLongHeader = parseLongHeaderInvariant(initialByte, cursor); if (!parsedLongHeader) { @@ -856,7 +855,7 @@ void onServerReadDataFromOpen( conn.peerAddress = conn.originalPeerAddress; } BufQueue udpData; - udpData.append(std::move(readData.networkData.packet.buf)); + udpData.append(std::move(readData.udpPacket.buf)); for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { @@ -1022,10 +1021,7 @@ void onServerReadDataFromOpen( auto& ackState = getAckState(conn, packetNumberSpace); uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum( - conn, - ackState, - packetNum, - readData.networkData.packet.timings.receiveTimePoint); + conn, ackState, packetNum, readData.udpPacket.timings.receiveTimePoint); if (distanceFromExpectedPacketNum > 0) { QUIC_STATS(conn.statsCallback, onOutOfOrderPacketReceived); } @@ -1113,7 +1109,7 @@ void onServerReadDataFromOpen( } }, markPacketLoss, - readData.networkData.packet.timings.receiveTimePoint)); + readData.udpPacket.timings.receiveTimePoint)); break; } case QuicFrame::Type::RstStreamFrame: { @@ -1272,9 +1268,7 @@ void onServerReadDataFromOpen( // early. So, make Datagram frames count towards ack policy pktHasRetransmittableData = true; handleDatagram( - conn, - frame, - readData.networkData.packet.timings.receiveTimePoint); + conn, frame, readData.udpPacket.timings.receiveTimePoint); break; } case QuicFrame::Type::ImmediateAckFrame: { @@ -1391,7 +1385,7 @@ void onServerReadDataFromClosed( ServerEvents::ReadData& readData) { CHECK_EQ(conn.state, ServerState::Closed); BufQueue udpData; - udpData.append(std::move(readData.networkData.packet.buf)); + udpData.append(std::move(readData.udpPacket.buf)); auto packetSize = udpData.empty() ? 0 : udpData.chainLength(); if (!conn.readCodec) { // drop data. We closed before we even got the first packet. This is diff --git a/quic/server/state/ServerStateMachine.h b/quic/server/state/ServerStateMachine.h index 1338d49a4..0225fd716 100644 --- a/quic/server/state/ServerStateMachine.h +++ b/quic/server/state/ServerStateMachine.h @@ -44,7 +44,7 @@ enum ServerState { struct ServerEvents { struct ReadData { folly::SocketAddress peer; - NetworkDataSingle networkData; + ReceivedPacket udpPacket; }; struct Close {};