From 086822ca7693c410cf813c2065ae43331ad5ff7d Mon Sep 17 00:00:00 2001 From: Brandon Schlinker Date: Thu, 21 Sep 2023 07:57:58 -0700 Subject: [PATCH] Cleanup and modularize receive path, improve timestamp support [2/x] Summary: This diff: - Changes `NetworkDataSingle` to have `ReceivedPacket` instead of `Buf`, in line with earlier change to `NetworkData` in D48714615 -- This diff is part of a larger stack focused on the following: - **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance: - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests. - The client currently has three receive paths, and none of them are well tested. - **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers. - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples). - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives. - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer. - **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps. Reviewed By: mjoras Differential Revision: D48714796 fbshipit-source-id: d96c2abc81e7c27a01bcd0dd552f274a0c1ede26 --- quic/api/QuicTransportBase.cpp | 3 ++- quic/api/test/QuicTransportBaseTest.cpp | 4 ++-- quic/client/QuicClientTransport.cpp | 9 +++++---- quic/server/QuicServerTransport.cpp | 2 +- quic/server/state/ServerStateMachine.cpp | 13 +++++++------ quic/state/StateData.h | 19 +++++++++++-------- 6 files changed, 28 insertions(+), 22 deletions(-) diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 772fec96c..774d6ef8e 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1905,7 +1905,8 @@ void QuicTransportBase::onNetworkData( onReadData( peer, NetworkDataSingle( - std::move(packet.buf), networkData.receiveTimePoint)); + ReceivedPacket(std::move(packet.buf)), + networkData.receiveTimePoint)); if (conn_->peerConnectionError) { closeImpl(QuicError( QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed")); diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 191bf0bc4..d14004e5e 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -265,10 +265,10 @@ class TestQuicTransport void onReadData(const folly::SocketAddress&, NetworkDataSingle&& data) override { - if (!data.data) { + if (!data.packet.buf) { return; } - folly::io::Cursor cursor(data.data.get()); + folly::io::Cursor cursor(data.packet.buf.get()); while (!cursor.isAtEnd()) { // create server chosen connId with processId = 0 and workerId = 0 ServerConnectionIdParams params(0, 0, 0); diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index 6e6319ef4..afd4761fb 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -122,7 +122,7 @@ void QuicClientTransport::processUDPData( const folly::SocketAddress& peer, NetworkDataSingle&& networkData) { BufQueue udpData; - udpData.append(std::move(networkData.data)); + udpData.append(std::move(networkData.packet.buf)); if (!conn_->version) { // We only check for version negotiation packets before the version @@ -155,7 +155,7 @@ void QuicClientTransport::processUDPData( !clientConn_->pendingOneRttData.empty()) { BufQueue pendingPacket; for (auto& pendingData : clientConn_->pendingOneRttData) { - pendingPacket.append(std::move(pendingData.networkData.data)); + pendingPacket.append(std::move(pendingData.networkData.packet.buf)); processPacketData( pendingData.peer, pendingData.networkData.receiveTimePoint, @@ -168,7 +168,7 @@ void QuicClientTransport::processUDPData( !clientConn_->pendingHandshakeData.empty()) { BufQueue pendingPacket; for (auto& pendingData : clientConn_->pendingHandshakeData) { - pendingPacket.append(std::move(pendingData.networkData.data)); + pendingPacket.append(std::move(pendingData.networkData.packet.buf)); processPacketData( pendingData.peer, pendingData.networkData.receiveTimePoint, @@ -270,7 +270,8 @@ void QuicClientTransport::processPacketData( : clientConn_->pendingHandshakeData; pendingData.emplace_back( NetworkDataSingle( - std::move(cipherUnavailable->packet), receiveTimePoint), + ReceivedPacket(std::move(cipherUnavailable->packet)), + receiveTimePoint), peer); if (conn_->qLogger) { conn_->qLogger->addPacketBuffered( diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index b1fed6506..22a603be0 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -512,7 +512,7 @@ void QuicServerTransport::processPendingData(bool async) { serverPtr->onNetworkData( pendingPacket.peer, NetworkData( - std::move(pendingPacket.networkData.data), + std::move(pendingPacket.networkData.packet.buf), pendingPacket.networkData.receiveTimePoint)); if (serverPtr->closeState_ == CloseState::CLOSED) { // The pending data could potentially contain a connection close, or diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index 6742f6863..7c16bf0d1 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -707,7 +707,8 @@ static void handleCipherUnavailable( ServerEvents::ReadData pendingReadData; pendingReadData.peer = readData.peer; pendingReadData.networkData = NetworkDataSingle( - std::move(originalData->packet), readData.networkData.receiveTimePoint); + ReceivedPacket(std::move(originalData->packet)), + readData.networkData.receiveTimePoint); pendingData->emplace_back(std::move(pendingReadData)); VLOG(10) << "Adding pending data to " << toString(originalData->protectionType) @@ -731,15 +732,15 @@ void onServerReadDataFromOpen( ServerEvents::ReadData& readData) { CHECK_EQ(conn.state, ServerState::Open); // Don't bother parsing if the data is empty. - if (!readData.networkData.data || - readData.networkData.data->computeChainDataLength() == 0) { + if (!readData.networkData.packet.buf || + readData.networkData.packet.buf->computeChainDataLength() == 0) { return; } bool firstPacketFromPeer = false; if (!conn.readCodec) { firstPacketFromPeer = true; - folly::io::Cursor cursor(readData.networkData.data.get()); + folly::io::Cursor cursor(readData.networkData.packet.buf.get()); auto initialByte = cursor.readBE(); auto parsedLongHeader = parseLongHeaderInvariant(initialByte, cursor); if (!parsedLongHeader) { @@ -848,7 +849,7 @@ void onServerReadDataFromOpen( conn.peerAddress = conn.originalPeerAddress; } BufQueue udpData; - udpData.append(std::move(readData.networkData.data)); + udpData.append(std::move(readData.networkData.packet.buf)); for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { @@ -1374,7 +1375,7 @@ void onServerReadDataFromClosed( ServerEvents::ReadData& readData) { CHECK_EQ(conn.state, ServerState::Closed); BufQueue udpData; - udpData.append(std::move(readData.networkData.data)); + udpData.append(std::move(readData.networkData.packet.buf)); auto packetSize = udpData.empty() ? 0 : udpData.chainLength(); if (!conn.readCodec) { // drop data. We closed before we even got the first packet. This is diff --git a/quic/state/StateData.h b/quic/state/StateData.h index e289de365..abe0147cd 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -45,6 +45,7 @@ namespace quic { struct ReceivedPacket { + ReceivedPacket() = default; explicit ReceivedPacket(Buf&& bufIn) : buf(std::move(bufIn)) {} // data @@ -65,8 +66,10 @@ struct NetworkData { } } - NetworkData(std::vector&& packetBufs, const TimePoint& receiveTime) - : receiveTimePoint(receiveTime), + NetworkData( + std::vector&& packetBufs, + const TimePoint& receiveTimePointIn) + : receiveTimePoint(receiveTimePointIn), packets([&packetBufs]() { std::vector result; result.reserve(packetBufs.size()); @@ -97,18 +100,18 @@ struct NetworkData { }; struct NetworkDataSingle { - Buf data; + ReceivedPacket packet; TimePoint receiveTimePoint; size_t totalData{0}; NetworkDataSingle() = default; NetworkDataSingle( - std::unique_ptr buf, - const TimePoint& receiveTime) - : data(std::move(buf)), receiveTimePoint(receiveTime) { - if (data) { - totalData += data->computeChainDataLength(); + ReceivedPacket&& packetIn, + const TimePoint& receiveTimePointIn) + : packet(std::move(packetIn)), receiveTimePoint(receiveTimePointIn) { + if (packet.buf) { + totalData += packet.buf->computeChainDataLength(); } } };