1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-27 03:41:14 +03:00

Cleanup and modularize receive path, improve timestamp support [1/x]

Summary:
This diff:
- Changes `NetworkData` to have `vector<ReceivedPacket>` instead of `vector<IOBuf>` to make it easier to associate metadata with individual UDP packets.

--

This diff is part of a larger stack focused on the following:

- **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance:
  - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests.
  - The client currently has three receive paths, and none of them are well tested.

- **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers.
  - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples).
  - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives.
  - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer.

- **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps.

Reviewed By: mjoras

Differential Revision: D48714615

fbshipit-source-id: b21a88f14156ed6398308223f50b2334328586ee
This commit is contained in:
Brandon Schlinker
2023-09-21 07:57:58 -07:00
committed by Facebook GitHub Bot
parent 8d9c19f1b2
commit a2c43bcf4c
5 changed files with 46 additions and 28 deletions

View File

@@ -1888,7 +1888,7 @@ void QuicTransportBase::onNetworkData(
SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket::
Builder()
.setPacketReceiveTime(networkData.receiveTimePoint)
.setPacketNumBytes(packet->computeChainDataLength())
.setPacketNumBytes(packet.buf->computeChainDataLength())
.build());
}
@@ -1904,7 +1904,8 @@ void QuicTransportBase::onNetworkData(
for (auto& packet : networkData.packets) {
onReadData(
peer,
NetworkDataSingle(std::move(packet), networkData.receiveTimePoint));
NetworkDataSingle(
std::move(packet.buf), networkData.receiveTimePoint));
if (conn_->peerConnectionError) {
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed"));

View File

@@ -727,9 +727,9 @@ class QuicClientTransportTestBase : public virtual testing::Test {
NetworkData&& data,
bool writes = true,
folly::SocketAddress* peer = nullptr) {
for (const auto& packetBuf : data.packets) {
for (const auto& packet : data.packets) {
deliverDataWithoutErrorCheck(
peer == nullptr ? serverAddr : *peer, packetBuf->coalesce(), writes);
peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes);
}
}
@@ -763,9 +763,9 @@ class QuicClientTransportTestBase : public virtual testing::Test {
NetworkData&& data,
bool writes = true,
folly::SocketAddress* peer = nullptr) {
for (const auto& packetBuf : data.packets) {
for (const auto& packet : data.packets) {
deliverData(
peer == nullptr ? serverAddr : *peer, packetBuf->coalesce(), writes);
peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes);
}
}

View File

@@ -885,7 +885,7 @@ void QuicServerWorker::dispatchPacketData(
// If there is a token present, decrypt it (could be either a retry
// token or a new token)
folly::io::Cursor cursor(networkData.packets.front().get());
folly::io::Cursor cursor(networkData.packets.front().buf.get());
auto maybeEncryptedToken = maybeGetEncryptedToken(cursor);
bool hasTokenSecret = transportSettings_.retryTokenSecret.hasValue();

View File

@@ -46,7 +46,7 @@ namespace test {
MATCHER_P(NetworkDataMatches, networkData, "") {
for (size_t i = 0; i < arg.packets.size(); ++i) {
folly::IOBufEqualTo eq;
bool equals = eq(*arg.packets[i], networkData);
bool equals = eq(*arg.packets[i].buf, networkData);
if (equals) {
return true;
}
@@ -1974,7 +1974,7 @@ TEST_F(QuicServerWorkerTakeoverTest, QuicServerTakeoverProcessForwardedPkt) {
// the original data should be extracted after processing takeover
// protocol related information
EXPECT_EQ(networkData->packets.size(), 1);
EXPECT_TRUE(eq(*data, *(networkData->packets[0])));
EXPECT_TRUE(eq(*data, *(networkData->packets[0].buf)));
EXPECT_TRUE(isForwardedData);
};
EXPECT_CALL(*takeoverWorkerCb_, routeDataToWorkerLong(_, _, _, _, _))
@@ -2168,7 +2168,7 @@ class QuicServerTest : public Test {
auto, const auto& networkData) mutable {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(folly::IOBufEqualTo()(
*networkData.packets[0], *expected));
*networkData.packets[0].buf, *expected));
std::unique_lock<std::mutex> lg(m);
calledOnNetworkData = true;
cv.notify_one();
@@ -2329,7 +2329,7 @@ TEST_F(QuicServerTest, RouteDataFromDifferentThread) {
.WillOnce(Invoke([&](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(
folly::IOBufEqualTo()(*networkData.packets[0], *initialData));
folly::IOBufEqualTo()(*networkData.packets[0].buf, *initialData));
}));
server_->routeDataToWorker(
@@ -2429,7 +2429,7 @@ class QuicServerTakeoverTest : public Test {
[&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(folly::IOBufEqualTo()(
*networkData.packets[0], *expected));
*networkData.packets[0].buf, *expected));
baton.post();
}));
return transport;
@@ -2536,8 +2536,8 @@ class QuicServerTakeoverTest : public Test {
.WillOnce(
Invoke([&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(
folly::IOBufEqualTo()(*networkData.packets[0], *expected));
EXPECT_TRUE(folly::IOBufEqualTo()(
*networkData.packets[0].buf, *expected));
b1.post();
}));
// new quic server receives the packet and forwards it
@@ -3000,7 +3000,7 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) {
[&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(folly::IOBufEqualTo()(
*networkData.packets[0], *expected));
*networkData.packets[0].buf, *expected));
b.post();
}));
return transport;
@@ -3043,7 +3043,7 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) {
const NetworkData& networkData) noexcept {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_EQ(peer, reader->getSocket().address());
EXPECT_TRUE(folly::IOBufEqualTo()(*data, *networkData.packets[0]));
EXPECT_TRUE(folly::IOBufEqualTo()(*data, *networkData.packets[0].buf));
b1.post();
};
EXPECT_CALL(*transport, onNetworkData(_, _)).WillOnce(Invoke(verifyZeroRtt));
@@ -3094,8 +3094,8 @@ TEST_F(QuicServerTest, ZeroRttBeforeInitial) {
EXPECT_CALL(*transport, onNetworkData(_, _))
.Times(2)
.WillRepeatedly(Invoke([&](auto, auto& networkData) {
for (auto& buf : networkData.packets) {
receivedData.emplace_back(buf->clone());
for (auto& packet : networkData.packets) {
receivedData.emplace_back(packet.buf->clone());
}
if (receivedData.size() == 2) {
b.post();

View File

@@ -43,9 +43,17 @@
#include <queue>
namespace quic {
struct ReceivedPacket {
explicit ReceivedPacket(Buf&& bufIn) : buf(std::move(bufIn)) {}
// data
Buf buf;
};
struct NetworkData {
TimePoint receiveTimePoint;
std::vector<Buf> packets;
std::vector<ReceivedPacket> packets;
size_t totalData{0};
NetworkData() = default;
@@ -59,20 +67,29 @@ struct NetworkData {
NetworkData(std::vector<Buf>&& packetBufs, const TimePoint& receiveTime)
: receiveTimePoint(receiveTime),
packets(std::move(packetBufs)),
totalData(0) {
for (const auto& buf : packets) {
totalData += buf->computeChainDataLength();
packets([&packetBufs]() {
std::vector<ReceivedPacket> result;
result.reserve(packetBufs.size());
for (auto& packetBuf : packetBufs) {
result.emplace_back(std::move(packetBuf));
}
return result;
}()),
totalData([this]() {
size_t result = 0;
for (const auto& packet : packets) {
result += packet.buf->computeChainDataLength();
}
return result;
}()) {}
std::unique_ptr<folly::IOBuf> moveAllData() && {
std::unique_ptr<folly::IOBuf> buf;
for (size_t i = 0; i < packets.size(); ++i) {
for (auto& packet : packets) {
if (buf) {
buf->prependChain(std::move(packets[i]));
buf->prependChain(std::move(packet.buf));
} else {
buf = std::move(packets[i]);
buf = std::move(packet.buf);
}
}
return buf;