1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-09 20:42:44 +03:00

Cleanup and modularize receive path, improve timestamp support [11/x]

Summary:
This diff:
1. Introduces a new `ReceivedPacket::Timings` structure, which will be expanded upon in subsequent diffs.
2. Adds a `ReceivedPacket::Timings` field to each `ReceivedPacket`
3. Uses the accessors added in the previous diff (D48724715) to populate the `ReceivedPacket::Timings` structure in each `ReceivedPacket` held by a `NetworkData` object. This is done by propagating the `NetworkData::receiveTimePoint` field to all `ReceivedPacket` held in a `NetworkData.` This propagation occurs each time a `ReceivedPacket` is added. The value is propagated again if the `NetworkData::receiveTimePoint` field is updated by looping over all previously added `ReceivedPacket`.

--

This diff is part of a larger stack focused on the following:

- **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance:
  - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests.
  - The client currently has three receive paths, and none of them are well tested.

- **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers.
  - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples).
  - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives.
  - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer.

- **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps.

Reviewed By: jbeshay

Differential Revision: D48725209

fbshipit-source-id: 580e7d7d1f3587f9947774b5ed19e9985df404c9
This commit is contained in:
Brandon Schlinker
2023-11-05 19:58:46 -08:00
committed by Facebook GitHub Bot
parent cb10ddf0ce
commit 05b98a99db
9 changed files with 65 additions and 32 deletions

View File

@@ -1889,7 +1889,7 @@ void QuicTransportBase::onNetworkData(
builder.addReceivedPacket( builder.addReceivedPacket(
SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket:: SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket::
Builder() Builder()
.setPacketReceiveTime(networkData.getReceiveTimePoint()) .setPacketReceiveTime(packet.timings.receiveTimePoint)
.setPacketNumBytes(packet.buf->computeChainDataLength()) .setPacketNumBytes(packet.buf->computeChainDataLength())
.build()); .build());
} }
@@ -1903,13 +1903,9 @@ void QuicTransportBase::onNetworkData(
}); });
} }
const auto receiveTimePoint = networkData.getReceiveTimePoint();
auto packets = std::move(networkData).movePackets(); auto packets = std::move(networkData).movePackets();
for (auto& packet : packets) { for (auto& packet : packets) {
onReadData( onReadData(peer, NetworkDataSingle(std::move(packet)));
peer,
NetworkDataSingle(
ReceivedPacket(std::move(packet.buf)), receiveTimePoint));
if (conn_->peerConnectionError) { if (conn_->peerConnectionError) {
closeImpl(QuicError( closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed")); QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed"));

View File

@@ -290,7 +290,7 @@ class TestQuicTransport
} else if (type == TestFrameType::DATAGRAM) { } else if (type == TestFrameType::DATAGRAM) {
auto buffer = decodeDatagramFrame(cursor); auto buffer = decodeDatagramFrame(cursor);
auto frame = DatagramFrame(buffer.second, std::move(buffer.first)); auto frame = DatagramFrame(buffer.second, std::move(buffer.first));
handleDatagram(*conn_, frame, data.receiveTimePoint); handleDatagram(*conn_, frame, data.packet.timings.receiveTimePoint);
} else if (type == TestFrameType::STREAM_GROUP) { } else if (type == TestFrameType::STREAM_GROUP) {
auto res = decodeStreamGroupBuffer(cursor); auto res = decodeStreamGroupBuffer(cursor);
QuicStreamState* stream = QuicStreamState* stream =

View File

@@ -143,7 +143,8 @@ void QuicClientTransport::processUDPData(
for (uint16_t processedPackets = 0; for (uint16_t processedPackets = 0;
!udpData.empty() && processedPackets < kMaxNumCoalescedPackets; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets;
processedPackets++) { processedPackets++) {
processPacketData(peer, networkData.receiveTimePoint, udpData); processPacketData(
peer, networkData.packet.timings.receiveTimePoint, udpData);
} }
VLOG_IF(4, !udpData.empty()) VLOG_IF(4, !udpData.empty())
<< "Leaving " << udpData.chainLength() << "Leaving " << udpData.chainLength()
@@ -158,7 +159,7 @@ void QuicClientTransport::processUDPData(
pendingPacket.append(std::move(pendingData.networkData.packet.buf)); pendingPacket.append(std::move(pendingData.networkData.packet.buf));
processPacketData( processPacketData(
pendingData.peer, pendingData.peer,
pendingData.networkData.receiveTimePoint, pendingData.networkData.packet.timings.receiveTimePoint,
pendingPacket); pendingPacket);
pendingPacket.move(); pendingPacket.move();
} }
@@ -171,7 +172,7 @@ void QuicClientTransport::processUDPData(
pendingPacket.append(std::move(pendingData.networkData.packet.buf)); pendingPacket.append(std::move(pendingData.networkData.packet.buf));
processPacketData( processPacketData(
pendingData.peer, pendingData.peer,
pendingData.networkData.receiveTimePoint, pendingData.networkData.packet.timings.receiveTimePoint,
pendingPacket); pendingPacket);
pendingPacket.move(); pendingPacket.move();
} }

View File

@@ -16,20 +16,36 @@
namespace quic { namespace quic {
struct ReceivedPacket { struct ReceivedPacket {
struct Timings {
// Legacy Receive TimePoint.
//
// This TimePoint is being deprecated in favor of having TimePoints that are
// named specifically for the timing event being tracked.
//
// The meaning of this TimePoint varies by transport implementation and
// settings: it can be the time when recv() started or completed, the socket
// RX timestamp for this packet or another packet received during the same
// call to recv*(), or something else entirely.
//
// TODO(bschlinker): Complete deprecation
TimePoint receiveTimePoint;
};
ReceivedPacket() = default; ReceivedPacket() = default;
explicit ReceivedPacket(Buf&& bufIn) : buf(std::move(bufIn)) {} explicit ReceivedPacket(Buf&& bufIn) : buf(std::move(bufIn)) {}
// data
Buf buf; Buf buf;
Timings timings;
}; };
struct NetworkData { struct NetworkData {
NetworkData() = default; NetworkData() = default;
NetworkData(Buf&& buf, const TimePoint& receiveTime) NetworkData(Buf&& buf, const TimePoint& receiveTimePointIn)
: receiveTimePoint_(receiveTime) { : receiveTimePoint_(receiveTimePointIn) {
if (buf) { if (buf) {
totalData_ = buf->computeChainDataLength(); totalData_ = buf->computeChainDataLength();
packets_.emplace_back(std::move(buf)); packets_.emplace_back(std::move(buf));
packets_.back().timings.receiveTimePoint = receiveTimePointIn;
} }
} }
@@ -37,12 +53,15 @@ struct NetworkData {
std::vector<Buf>&& packetBufs, std::vector<Buf>&& packetBufs,
const TimePoint& receiveTimePointIn) const TimePoint& receiveTimePointIn)
: receiveTimePoint_(receiveTimePointIn), : receiveTimePoint_(receiveTimePointIn),
packets_([&packetBufs]() { packets_([&packetBufs, &receiveTimePointIn]() {
std::vector<ReceivedPacket> result; std::vector<ReceivedPacket> result;
result.reserve(packetBufs.size()); result.reserve(packetBufs.size());
for (auto& packetBuf : packetBufs) { for (auto& packetBuf : packetBufs) {
result.emplace_back(std::move(packetBuf)); result.emplace_back(std::move(packetBuf));
} }
for (auto& packet : result) {
packet.timings.receiveTimePoint = receiveTimePointIn;
}
return result; return result;
}()), }()),
totalData_([this]() { totalData_([this]() {
@@ -59,6 +78,7 @@ struct NetworkData {
void addPacket(ReceivedPacket&& packetIn) { void addPacket(ReceivedPacket&& packetIn) {
packets_.emplace_back(std::move(packetIn)); packets_.emplace_back(std::move(packetIn));
packets_.back().timings.receiveTimePoint = receiveTimePoint_;
} }
[[nodiscard]] const std::vector<ReceivedPacket>& getPackets() const { [[nodiscard]] const std::vector<ReceivedPacket>& getPackets() const {
@@ -71,6 +91,9 @@ struct NetworkData {
void setReceiveTimePoint(const TimePoint& receiveTimePointIn) { void setReceiveTimePoint(const TimePoint& receiveTimePointIn) {
receiveTimePoint_ = receiveTimePointIn; receiveTimePoint_ = receiveTimePointIn;
for (auto& packet : packets_) {
packet.timings.receiveTimePoint = receiveTimePointIn;
}
} }
[[nodiscard]] TimePoint getReceiveTimePoint() const { [[nodiscard]] TimePoint getReceiveTimePoint() const {
@@ -105,15 +128,23 @@ struct NetworkData {
struct NetworkDataSingle { struct NetworkDataSingle {
ReceivedPacket packet; ReceivedPacket packet;
TimePoint receiveTimePoint;
size_t totalData{0}; size_t totalData{0};
NetworkDataSingle() = default; NetworkDataSingle() = default;
explicit NetworkDataSingle(ReceivedPacket&& packetIn)
: packet(std::move(packetIn)) {
if (packet.buf) {
totalData += packet.buf->computeChainDataLength();
}
}
// TODO(bschlinker): Deprecate
NetworkDataSingle( NetworkDataSingle(
ReceivedPacket&& packetIn, ReceivedPacket&& packetIn,
const TimePoint& receiveTimePointIn) const TimePoint& receiveTimePointIn)
: packet(std::move(packetIn)), receiveTimePoint(receiveTimePointIn) { : packet(std::move(packetIn)) {
packet.timings.receiveTimePoint = receiveTimePointIn;
if (packet.buf) { if (packet.buf) {
totalData += packet.buf->computeChainDataLength(); totalData += packet.buf->computeChainDataLength();
} }

View File

@@ -129,9 +129,11 @@ void TakeoverPacketHandler::setDestination(
void TakeoverPacketHandler::forwardPacketToAnotherServer( void TakeoverPacketHandler::forwardPacketToAnotherServer(
const folly::SocketAddress& peerAddress, const folly::SocketAddress& peerAddress,
Buf data, NetworkData&& networkData) {
const TimePoint& packetReceiveTime) { const TimePoint receiveTimePoint = networkData.getReceiveTimePoint();
// create buffer for the peerAddress address and clientPacketReceiveTime Buf buf = std::move(networkData).moveAllData();
// create buffer for the peerAddress address and receiveTimePoint
// Serialize: version (4B), socket(2 + 16)B and time of ack (8B) // Serialize: version (4B), socket(2 + 16)B and time of ack (8B)
auto bufSize = sizeof(TakeoverProtocolVersion) + sizeof(uint16_t) + auto bufSize = sizeof(TakeoverProtocolVersion) + sizeof(uint16_t) +
peerAddress.getActualSize() + sizeof(uint64_t); peerAddress.getActualSize() + sizeof(uint64_t);
@@ -142,9 +144,9 @@ void TakeoverPacketHandler::forwardPacketToAnotherServer(
uint16_t socklen = peerAddress.getAddress(&addrStorage); uint16_t socklen = peerAddress.getAddress(&addrStorage);
appender.writeBE<uint16_t>(socklen); appender.writeBE<uint16_t>(socklen);
appender.push((uint8_t*)&addrStorage, socklen); appender.push((uint8_t*)&addrStorage, socklen);
uint64_t tick = packetReceiveTime.time_since_epoch().count(); uint64_t tick = receiveTimePoint.time_since_epoch().count();
appender.writeBE<uint64_t>(tick); appender.writeBE<uint64_t>(tick);
writeBuffer->prependChain(std::move(data)); writeBuffer->prependChain(std::move(buf));
forwardPacket(std::move(writeBuffer)); forwardPacket(std::move(writeBuffer));
} }

View File

@@ -80,8 +80,7 @@ class TakeoverPacketHandler {
void forwardPacketToAnotherServer( void forwardPacketToAnotherServer(
const folly::SocketAddress& peerAddress, const folly::SocketAddress& peerAddress,
Buf data, NetworkData&& networkData);
const TimePoint& packetReceiveTime);
void processForwardedPacket(const folly::SocketAddress& client, Buf data); void processForwardedPacket(const folly::SocketAddress& client, Buf data);

View File

@@ -513,7 +513,7 @@ void QuicServerTransport::processPendingData(bool async) {
pendingPacket.peer, pendingPacket.peer,
NetworkData( NetworkData(
std::move(pendingPacket.networkData.packet.buf), std::move(pendingPacket.networkData.packet.buf),
pendingPacket.networkData.receiveTimePoint)); pendingPacket.networkData.packet.timings.receiveTimePoint));
if (serverPtr->closeState_ == CloseState::CLOSED) { if (serverPtr->closeState_ == CloseState::CLOSED) {
// The pending data could potentially contain a connection close, or // The pending data could potentially contain a connection close, or
// the app could have triggered a connection close with an error. It // the app could have triggered a connection close with an error. It

View File

@@ -590,9 +590,8 @@ void QuicServerWorker::forwardNetworkData(
"Forwarding packet with unknown connId version from client={} to another process, routingInfo={}", "Forwarding packet with unknown connId version from client={} to another process, routingInfo={}",
client.describe(), client.describe(),
logRoutingInfo(routingData.destinationConnId)); logRoutingInfo(routingData.destinationConnId));
auto recvTime = networkData.getReceiveTimePoint();
takeoverPktHandler_.forwardPacketToAnotherServer( takeoverPktHandler_.forwardPacketToAnotherServer(
client, std::move(networkData).moveAllData(), recvTime); client, std::move(networkData));
QUIC_STATS(statsCallback_, onPacketForwarded); QUIC_STATS(statsCallback_, onPacketForwarded);
return; return;
} else { } else {
@@ -795,9 +794,8 @@ void QuicServerWorker::dispatchPacketData(
"Forwarding packet from client={} to another process, routingInfo={}", "Forwarding packet from client={} to another process, routingInfo={}",
client.describe(), client.describe(),
logRoutingInfo(dstConnId)); logRoutingInfo(dstConnId));
auto recvTime = networkData.getReceiveTimePoint();
takeoverPktHandler_.forwardPacketToAnotherServer( takeoverPktHandler_.forwardPacketToAnotherServer(
client, std::move(networkData).moveAllData(), recvTime); client, std::move(networkData));
QUIC_STATS(statsCallback_, onPacketForwarded); QUIC_STATS(statsCallback_, onPacketForwarded);
}); });

View File

@@ -715,7 +715,7 @@ static void handleCipherUnavailable(
pendingReadData.peer = readData.peer; pendingReadData.peer = readData.peer;
pendingReadData.networkData = NetworkDataSingle( pendingReadData.networkData = NetworkDataSingle(
ReceivedPacket(std::move(originalData->packet)), ReceivedPacket(std::move(originalData->packet)),
readData.networkData.receiveTimePoint); readData.networkData.packet.timings.receiveTimePoint);
pendingData->emplace_back(std::move(pendingReadData)); pendingData->emplace_back(std::move(pendingReadData));
VLOG(10) << "Adding pending data to " VLOG(10) << "Adding pending data to "
<< toString(originalData->protectionType) << toString(originalData->protectionType)
@@ -1022,7 +1022,10 @@ void onServerReadDataFromOpen(
auto& ackState = getAckState(conn, packetNumberSpace); auto& ackState = getAckState(conn, packetNumberSpace);
uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum( uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum(
conn, ackState, packetNum, readData.networkData.receiveTimePoint); conn,
ackState,
packetNum,
readData.networkData.packet.timings.receiveTimePoint);
if (distanceFromExpectedPacketNum > 0) { if (distanceFromExpectedPacketNum > 0) {
QUIC_STATS(conn.statsCallback, onOutOfOrderPacketReceived); QUIC_STATS(conn.statsCallback, onOutOfOrderPacketReceived);
} }
@@ -1110,7 +1113,7 @@ void onServerReadDataFromOpen(
} }
}, },
markPacketLoss, markPacketLoss,
readData.networkData.receiveTimePoint)); readData.networkData.packet.timings.receiveTimePoint));
break; break;
} }
case QuicFrame::Type::RstStreamFrame: { case QuicFrame::Type::RstStreamFrame: {
@@ -1268,7 +1271,10 @@ void onServerReadDataFromOpen(
// Datagram isn't retransmittable. But we would like to ack them // Datagram isn't retransmittable. But we would like to ack them
// early. So, make Datagram frames count towards ack policy // early. So, make Datagram frames count towards ack policy
pktHasRetransmittableData = true; pktHasRetransmittableData = true;
handleDatagram(conn, frame, readData.networkData.receiveTimePoint); handleDatagram(
conn,
frame,
readData.networkData.packet.timings.receiveTimePoint);
break; break;
} }
case QuicFrame::Type::ImmediateAckFrame: { case QuicFrame::Type::ImmediateAckFrame: {