1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Cleanup and modularize receive path, improve timestamp support [15/x]

Summary:
This diff drops `NetworkDataSingle` in favor of `ReceivedPacket`. The latter contains a `ReceivedPacket::Timings` field that has the same `receiveTimePoint` currently in `NetworkDataSingle`, while also providing other useful signals.

--

This diff is part of a larger stack focused on the following:

- **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance:
  - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests.
  - The client currently has three receive paths, and none of them are well tested.

- **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers.
  - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples).
  - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives.
  - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer.

- **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps

Reviewed By: silver23arrow

Differential Revision: D48739219

fbshipit-source-id: fc2cdb7b425d68c729dd3bec00b6c6ff3c4bf8ec
This commit is contained in:
Brandon Schlinker
2023-11-05 19:58:46 -08:00
committed by Facebook GitHub Bot
parent d5ba8c54cd
commit 04facac67d
12 changed files with 41 additions and 83 deletions

View File

@@ -1905,7 +1905,7 @@ void QuicTransportBase::onNetworkData(
auto packets = std::move(networkData).movePackets(); auto packets = std::move(networkData).movePackets();
for (auto& packet : packets) { for (auto& packet : packets) {
onReadData(peer, NetworkDataSingle(std::move(packet))); onReadData(peer, std::move(packet));
if (conn_->peerConnectionError) { if (conn_->peerConnectionError) {
closeImpl(QuicError( closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed")); QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed"));

View File

@@ -309,7 +309,7 @@ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver {
*/ */
virtual void onReadData( virtual void onReadData(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) = 0; ReceivedPacket&& udpPacket) = 0;
/** /**
* Invoked when we have to write some data to the wire. * Invoked when we have to write some data to the wire.

View File

@@ -263,12 +263,12 @@ class TestQuicTransport
return lossTimeout_.getTimeRemaining(); return lossTimeout_.getTimeRemaining();
} }
void onReadData(const folly::SocketAddress&, NetworkDataSingle&& data) void onReadData(const folly::SocketAddress&, ReceivedPacket&& udpPacket)
override { override {
if (!data.packet.buf) { if (!udpPacket.buf) {
return; return;
} }
folly::io::Cursor cursor(data.packet.buf.get()); folly::io::Cursor cursor(udpPacket.buf.get());
while (!cursor.isAtEnd()) { while (!cursor.isAtEnd()) {
// create server chosen connId with processId = 0 and workerId = 0 // create server chosen connId with processId = 0 and workerId = 0
ServerConnectionIdParams params(0, 0, 0); ServerConnectionIdParams params(0, 0, 0);
@@ -290,7 +290,7 @@ class TestQuicTransport
} else if (type == TestFrameType::DATAGRAM) { } else if (type == TestFrameType::DATAGRAM) {
auto buffer = decodeDatagramFrame(cursor); auto buffer = decodeDatagramFrame(cursor);
auto frame = DatagramFrame(buffer.second, std::move(buffer.first)); auto frame = DatagramFrame(buffer.second, std::move(buffer.first));
handleDatagram(*conn_, frame, data.packet.timings.receiveTimePoint); handleDatagram(*conn_, frame, udpPacket.timings.receiveTimePoint);
} else if (type == TestFrameType::STREAM_GROUP) { } else if (type == TestFrameType::STREAM_GROUP) {
auto res = decodeStreamGroupBuffer(cursor); auto res = decodeStreamGroupBuffer(cursor);
QuicStreamState* stream = QuicStreamState* stream =

View File

@@ -70,7 +70,7 @@ class TestQuicTransport
void onReadData( void onReadData(
const folly::SocketAddress& /* peer */, const folly::SocketAddress& /* peer */,
NetworkDataSingle&& /*networkData*/) noexcept override {} ReceivedPacket&& /* udpPacket */) noexcept override {}
void writeData() override { void writeData() override {
if (closed) { if (closed) {

View File

@@ -120,9 +120,9 @@ QuicClientTransport::~QuicClientTransport() {
void QuicClientTransport::processUdpPacket( void QuicClientTransport::processUdpPacket(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) { ReceivedPacket&& udpPacket) {
BufQueue udpData; BufQueue udpData;
udpData.append(std::move(networkData.packet.buf)); udpData.append(std::move(udpPacket.buf));
if (!conn_->version) { if (!conn_->version) {
// We only check for version negotiation packets before the version // We only check for version negotiation packets before the version
@@ -143,7 +143,7 @@ void QuicClientTransport::processUdpPacket(
for (uint16_t processedPackets = 0; for (uint16_t processedPackets = 0;
!udpData.empty() && processedPackets < kMaxNumCoalescedPackets; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets;
processedPackets++) { processedPackets++) {
processUdpPacketData(peer, networkData.packet.timings, udpData); processUdpPacketData(peer, udpPacket.timings, udpData);
} }
VLOG_IF(4, !udpData.empty()) VLOG_IF(4, !udpData.empty())
<< "Leaving " << udpData.chainLength() << "Leaving " << udpData.chainLength()
@@ -155,11 +155,9 @@ void QuicClientTransport::processUdpPacket(
!clientConn_->pendingOneRttData.empty()) { !clientConn_->pendingOneRttData.empty()) {
BufQueue pendingPacket; BufQueue pendingPacket;
for (auto& pendingData : clientConn_->pendingOneRttData) { for (auto& pendingData : clientConn_->pendingOneRttData) {
pendingPacket.append(std::move(pendingData.networkData.packet.buf)); pendingPacket.append(std::move(pendingData.udpPacket.buf));
processUdpPacketData( processUdpPacketData(
pendingData.peer, pendingData.peer, pendingData.udpPacket.timings, pendingPacket);
pendingData.networkData.packet.timings,
pendingPacket);
pendingPacket.move(); pendingPacket.move();
} }
clientConn_->pendingOneRttData.clear(); clientConn_->pendingOneRttData.clear();
@@ -168,11 +166,9 @@ void QuicClientTransport::processUdpPacket(
!clientConn_->pendingHandshakeData.empty()) { !clientConn_->pendingHandshakeData.empty()) {
BufQueue pendingPacket; BufQueue pendingPacket;
for (auto& pendingData : clientConn_->pendingHandshakeData) { for (auto& pendingData : clientConn_->pendingHandshakeData) {
pendingPacket.append(std::move(pendingData.networkData.packet.buf)); pendingPacket.append(std::move(pendingData.udpPacket.buf));
processUdpPacketData( processUdpPacketData(
pendingData.peer, pendingData.peer, pendingData.udpPacket.timings, pendingPacket);
pendingData.networkData.packet.timings,
pendingPacket);
pendingPacket.move(); pendingPacket.move();
} }
clientConn_->pendingHandshakeData.clear(); clientConn_->pendingHandshakeData.clear();
@@ -269,10 +265,7 @@ void QuicClientTransport::processUdpPacketData(
? clientConn_->pendingOneRttData ? clientConn_->pendingOneRttData
: clientConn_->pendingHandshakeData; : clientConn_->pendingHandshakeData;
pendingData.emplace_back( pendingData.emplace_back(
NetworkDataSingle( ReceivedPacket(std::move(cipherUnavailable->packet), udpPacketTimings),
ReceivedPacket(
std::move(cipherUnavailable->packet), udpPacketTimings),
udpPacketTimings.receiveTimePoint),
peer); peer);
if (conn_->qLogger) { if (conn_->qLogger) {
conn_->qLogger->addPacketBuffered( conn_->qLogger->addPacketBuffered(
@@ -806,7 +799,7 @@ void QuicClientTransport::processUdpPacketData(
void QuicClientTransport::onReadData( void QuicClientTransport::onReadData(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) { ReceivedPacket&& udpPacket) {
if (closeState_ == CloseState::CLOSED) { if (closeState_ == CloseState::CLOSED) {
// If we are closed, then we shouldn't process new network data. // If we are closed, then we shouldn't process new network data.
QUIC_STATS( QUIC_STATS(
@@ -817,7 +810,7 @@ void QuicClientTransport::onReadData(
return; return;
} }
bool waitingForFirstPacket = !hasReceivedPackets(*conn_); bool waitingForFirstPacket = !hasReceivedPackets(*conn_);
processUdpPacket(peer, std::move(networkData)); processUdpPacket(peer, std::move(udpPacket));
if (connSetupCallback_ && waitingForFirstPacket && if (connSetupCallback_ && waitingForFirstPacket &&
hasReceivedPackets(*conn_)) { hasReceivedPackets(*conn_)) {
connSetupCallback_->onFirstPeerPacketProcessed(); connSetupCallback_->onFirstPeerPacketProcessed();

View File

@@ -122,9 +122,8 @@ class QuicClientTransport
} }
// From QuicTransportBase // From QuicTransportBase
void onReadData( void onReadData(const folly::SocketAddress& peer, ReceivedPacket&& udpPacket)
const folly::SocketAddress& peer, override;
NetworkDataSingle&& networkData) override;
void writeData() override; void writeData() override;
void closeTransport() override; void closeTransport() override;
void unbindConnection() override; void unbindConnection() override;
@@ -247,7 +246,7 @@ class QuicClientTransport
*/ */
void processUdpPacket( void processUdpPacket(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
NetworkDataSingle&& networkData); ReceivedPacket&& udpPacket);
/** /**
* Process data within a single UDP packet. * Process data within a single UDP packet.

View File

@@ -22,13 +22,11 @@ namespace quic {
struct CachedServerTransportParameters; struct CachedServerTransportParameters;
struct PendingClientData { struct PendingClientData {
NetworkDataSingle networkData; ReceivedPacket udpPacket;
folly::SocketAddress peer; folly::SocketAddress peer;
PendingClientData( PendingClientData(ReceivedPacket udpPacketIn, folly::SocketAddress peerIn)
NetworkDataSingle networkDataIn, : udpPacket(std::move(udpPacketIn)), peer(std::move(peerIn)) {}
folly::SocketAddress peerIn)
: networkData(std::move(networkDataIn)), peer(std::move(peerIn)) {}
}; };
struct QuicClientConnectionState : public QuicConnectionStateBase { struct QuicClientConnectionState : public QuicConnectionStateBase {

View File

@@ -125,29 +125,4 @@ struct NetworkData {
size_t totalData_{0}; size_t totalData_{0};
}; };
struct NetworkDataSingle {
ReceivedPacket packet;
size_t totalData{0};
NetworkDataSingle() = default;
explicit NetworkDataSingle(ReceivedPacket&& packetIn)
: packet(std::move(packetIn)) {
if (packet.buf) {
totalData += packet.buf->computeChainDataLength();
}
}
// TODO(bschlinker): Deprecate
NetworkDataSingle(
ReceivedPacket&& packetIn,
const TimePoint& receiveTimePointIn)
: packet(std::move(packetIn)) {
packet.timings.receiveTimePoint = receiveTimePointIn;
if (packet.buf) {
totalData += packet.buf->computeChainDataLength();
}
}
};
} // namespace quic } // namespace quic

View File

@@ -144,10 +144,10 @@ void QuicServerTransport::setServerConnectionIdRejector(
void QuicServerTransport::onReadData( void QuicServerTransport::onReadData(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) { ReceivedPacket&& udpPacket) {
ServerEvents::ReadData readData; ServerEvents::ReadData readData;
readData.peer = peer; readData.peer = peer;
readData.networkData = std::move(networkData); readData.udpPacket = std::move(udpPacket);
bool waitingForFirstPacket = !hasReceivedPackets(*conn_); bool waitingForFirstPacket = !hasReceivedPackets(*conn_);
uint64_t prevWritableBytes = serverConn_->writableBytesLimit uint64_t prevWritableBytes = serverConn_->writableBytesLimit
? *serverConn_->writableBytesLimit ? *serverConn_->writableBytesLimit
@@ -512,8 +512,8 @@ void QuicServerTransport::processPendingData(bool async) {
serverPtr->onNetworkData( serverPtr->onNetworkData(
pendingPacket.peer, pendingPacket.peer,
NetworkData( NetworkData(
std::move(pendingPacket.networkData.packet.buf), std::move(pendingPacket.udpPacket.buf),
pendingPacket.networkData.packet.timings.receiveTimePoint)); pendingPacket.udpPacket.timings.receiveTimePoint));
if (serverPtr->closeState_ == CloseState::CLOSED) { if (serverPtr->closeState_ == CloseState::CLOSED) {
// The pending data could potentially contain a connection close, or // The pending data could potentially contain a connection close, or
// the app could have triggered a connection close with an error. It // the app could have triggered a connection close with an error. It

View File

@@ -133,9 +133,8 @@ class QuicServerTransport
void verifiedClientAddress(); void verifiedClientAddress();
// From QuicTransportBase // From QuicTransportBase
void onReadData( void onReadData(const folly::SocketAddress& peer, ReceivedPacket&& udpPacket)
const folly::SocketAddress& peer, override;
NetworkDataSingle&& networkData) override;
void writeData() override; void writeData() override;
void closeTransport() override; void closeTransport() override;
void unbindConnection() override; void unbindConnection() override;

View File

@@ -713,9 +713,8 @@ static void handleCipherUnavailable(
PacketDropReason::PARSE_ERROR_PACKET_BUFFERED); PacketDropReason::PARSE_ERROR_PACKET_BUFFERED);
ServerEvents::ReadData pendingReadData; ServerEvents::ReadData pendingReadData;
pendingReadData.peer = readData.peer; pendingReadData.peer = readData.peer;
pendingReadData.networkData = NetworkDataSingle( pendingReadData.udpPacket = ReceivedPacket(
ReceivedPacket(std::move(originalData->packet)), std::move(originalData->packet), readData.udpPacket.timings);
readData.networkData.packet.timings.receiveTimePoint);
pendingData->emplace_back(std::move(pendingReadData)); pendingData->emplace_back(std::move(pendingReadData));
VLOG(10) << "Adding pending data to " VLOG(10) << "Adding pending data to "
<< toString(originalData->protectionType) << toString(originalData->protectionType)
@@ -739,15 +738,15 @@ void onServerReadDataFromOpen(
ServerEvents::ReadData& readData) { ServerEvents::ReadData& readData) {
CHECK_EQ(conn.state, ServerState::Open); CHECK_EQ(conn.state, ServerState::Open);
// Don't bother parsing if the data is empty. // Don't bother parsing if the data is empty.
if (!readData.networkData.packet.buf || if (!readData.udpPacket.buf ||
readData.networkData.packet.buf->computeChainDataLength() == 0) { readData.udpPacket.buf->computeChainDataLength() == 0) {
return; return;
} }
bool firstPacketFromPeer = false; bool firstPacketFromPeer = false;
if (!conn.readCodec) { if (!conn.readCodec) {
firstPacketFromPeer = true; firstPacketFromPeer = true;
folly::io::Cursor cursor(readData.networkData.packet.buf.get()); folly::io::Cursor cursor(readData.udpPacket.buf.get());
auto initialByte = cursor.readBE<uint8_t>(); auto initialByte = cursor.readBE<uint8_t>();
auto parsedLongHeader = parseLongHeaderInvariant(initialByte, cursor); auto parsedLongHeader = parseLongHeaderInvariant(initialByte, cursor);
if (!parsedLongHeader) { if (!parsedLongHeader) {
@@ -856,7 +855,7 @@ void onServerReadDataFromOpen(
conn.peerAddress = conn.originalPeerAddress; conn.peerAddress = conn.originalPeerAddress;
} }
BufQueue udpData; BufQueue udpData;
udpData.append(std::move(readData.networkData.packet.buf)); udpData.append(std::move(readData.udpPacket.buf));
for (uint16_t processedPackets = 0; for (uint16_t processedPackets = 0;
!udpData.empty() && processedPackets < kMaxNumCoalescedPackets; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets;
processedPackets++) { processedPackets++) {
@@ -1022,10 +1021,7 @@ void onServerReadDataFromOpen(
auto& ackState = getAckState(conn, packetNumberSpace); auto& ackState = getAckState(conn, packetNumberSpace);
uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum( uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum(
conn, conn, ackState, packetNum, readData.udpPacket.timings.receiveTimePoint);
ackState,
packetNum,
readData.networkData.packet.timings.receiveTimePoint);
if (distanceFromExpectedPacketNum > 0) { if (distanceFromExpectedPacketNum > 0) {
QUIC_STATS(conn.statsCallback, onOutOfOrderPacketReceived); QUIC_STATS(conn.statsCallback, onOutOfOrderPacketReceived);
} }
@@ -1113,7 +1109,7 @@ void onServerReadDataFromOpen(
} }
}, },
markPacketLoss, markPacketLoss,
readData.networkData.packet.timings.receiveTimePoint)); readData.udpPacket.timings.receiveTimePoint));
break; break;
} }
case QuicFrame::Type::RstStreamFrame: { case QuicFrame::Type::RstStreamFrame: {
@@ -1272,9 +1268,7 @@ void onServerReadDataFromOpen(
// early. So, make Datagram frames count towards ack policy // early. So, make Datagram frames count towards ack policy
pktHasRetransmittableData = true; pktHasRetransmittableData = true;
handleDatagram( handleDatagram(
conn, conn, frame, readData.udpPacket.timings.receiveTimePoint);
frame,
readData.networkData.packet.timings.receiveTimePoint);
break; break;
} }
case QuicFrame::Type::ImmediateAckFrame: { case QuicFrame::Type::ImmediateAckFrame: {
@@ -1391,7 +1385,7 @@ void onServerReadDataFromClosed(
ServerEvents::ReadData& readData) { ServerEvents::ReadData& readData) {
CHECK_EQ(conn.state, ServerState::Closed); CHECK_EQ(conn.state, ServerState::Closed);
BufQueue udpData; BufQueue udpData;
udpData.append(std::move(readData.networkData.packet.buf)); udpData.append(std::move(readData.udpPacket.buf));
auto packetSize = udpData.empty() ? 0 : udpData.chainLength(); auto packetSize = udpData.empty() ? 0 : udpData.chainLength();
if (!conn.readCodec) { if (!conn.readCodec) {
// drop data. We closed before we even got the first packet. This is // drop data. We closed before we even got the first packet. This is

View File

@@ -44,7 +44,7 @@ enum ServerState {
struct ServerEvents { struct ServerEvents {
struct ReadData { struct ReadData {
folly::SocketAddress peer; folly::SocketAddress peer;
NetworkDataSingle networkData; ReceivedPacket udpPacket;
}; };
struct Close {}; struct Close {};