mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-11-27 03:41:14 +03:00
Cleanup and modularize receive path, improve timestamp support [10/x]
Summary: This diff makes the fields in `NetworkData` private so that they can only be manipulated via accessors. - This will be used in a subsequent diff to make it possible to populate a new `ReceivedPacket::Timings` structure in each `ReceivedPacket` held by a `NetworkData` object with the timing information currently held in `NetworkData.` - These accessors are temporary and being used to split up the stack, so relying on existing tests to provide coverage instead of adding new tests here. -- This diff is part of a larger stack focused on the following: - **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance: - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests. - The client currently has three receive paths, and none of them are well tested. - **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers. - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples). - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives. - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer. - **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps. Reviewed By: jbeshay Differential Revision: D48724715 fbshipit-source-id: 0230ea3feea525fa15b908161f444f4f6d9d4b39
This commit is contained in:
committed by
Facebook GitHub Bot
parent
d1f042b967
commit
cb10ddf0ce
@@ -1873,7 +1873,7 @@ void QuicTransportBase::onNetworkData(
|
|||||||
updateWriteLooper(true);
|
updateWriteLooper(true);
|
||||||
};
|
};
|
||||||
try {
|
try {
|
||||||
conn_->lossState.totalBytesRecvd += networkData.totalData;
|
conn_->lossState.totalBytesRecvd += networkData.getTotalData();
|
||||||
auto originalAckVersion = currentAckStateVersion(*conn_);
|
auto originalAckVersion = currentAckStateVersion(*conn_);
|
||||||
|
|
||||||
// handle PacketsReceivedEvent if requested by observers
|
// handle PacketsReceivedEvent if requested by observers
|
||||||
@@ -1883,13 +1883,13 @@ void QuicTransportBase::onNetworkData(
|
|||||||
SocketObserverInterface::Events::packetsReceivedEvents>()) {
|
SocketObserverInterface::Events::packetsReceivedEvents>()) {
|
||||||
auto builder = SocketObserverInterface::PacketsReceivedEvent::Builder()
|
auto builder = SocketObserverInterface::PacketsReceivedEvent::Builder()
|
||||||
.setReceiveLoopTime(TimePoint::clock::now())
|
.setReceiveLoopTime(TimePoint::clock::now())
|
||||||
.setNumPacketsReceived(networkData.packets.size())
|
.setNumPacketsReceived(networkData.getPackets().size())
|
||||||
.setNumBytesReceived(networkData.totalData);
|
.setNumBytesReceived(networkData.getTotalData());
|
||||||
for (auto& packet : networkData.packets) {
|
for (auto& packet : networkData.getPackets()) {
|
||||||
builder.addReceivedPacket(
|
builder.addReceivedPacket(
|
||||||
SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket::
|
SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket::
|
||||||
Builder()
|
Builder()
|
||||||
.setPacketReceiveTime(networkData.receiveTimePoint)
|
.setPacketReceiveTime(networkData.getReceiveTimePoint())
|
||||||
.setPacketNumBytes(packet.buf->computeChainDataLength())
|
.setPacketNumBytes(packet.buf->computeChainDataLength())
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
@@ -1903,12 +1903,13 @@ void QuicTransportBase::onNetworkData(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto& packet : networkData.packets) {
|
const auto receiveTimePoint = networkData.getReceiveTimePoint();
|
||||||
|
auto packets = std::move(networkData).movePackets();
|
||||||
|
for (auto& packet : packets) {
|
||||||
onReadData(
|
onReadData(
|
||||||
peer,
|
peer,
|
||||||
NetworkDataSingle(
|
NetworkDataSingle(
|
||||||
ReceivedPacket(std::move(packet.buf)),
|
ReceivedPacket(std::move(packet.buf)), receiveTimePoint));
|
||||||
networkData.receiveTimePoint));
|
|
||||||
if (conn_->peerConnectionError) {
|
if (conn_->peerConnectionError) {
|
||||||
closeImpl(QuicError(
|
closeImpl(QuicError(
|
||||||
QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed"));
|
QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed"));
|
||||||
|
|||||||
@@ -1236,9 +1236,9 @@ void QuicClientTransport::recvMsg(
|
|||||||
size_t len = bytesRead;
|
size_t len = bytesRead;
|
||||||
size_t remaining = len;
|
size_t remaining = len;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
size_t totalNumPackets =
|
size_t totalNumPackets = networkData.getPackets().size() +
|
||||||
networkData.packets.size() + ((len + params.gro - 1) / params.gro);
|
((len + params.gro - 1) / params.gro);
|
||||||
networkData.packets.reserve(totalNumPackets);
|
networkData.reserve(totalNumPackets);
|
||||||
while (remaining) {
|
while (remaining) {
|
||||||
if (static_cast<int>(remaining) > params.gro) {
|
if (static_cast<int>(remaining) > params.gro) {
|
||||||
auto tmp = readBuffer->cloneOne();
|
auto tmp = readBuffer->cloneOne();
|
||||||
@@ -1251,18 +1251,18 @@ void QuicClientTransport::recvMsg(
|
|||||||
|
|
||||||
offset += params.gro;
|
offset += params.gro;
|
||||||
remaining -= params.gro;
|
remaining -= params.gro;
|
||||||
networkData.packets.emplace_back(std::move(tmp));
|
networkData.addPacket(ReceivedPacket(std::move(tmp)));
|
||||||
} else {
|
} else {
|
||||||
// do not clone the last packet
|
// do not clone the last packet
|
||||||
// start at offset, use all the remaining data
|
// start at offset, use all the remaining data
|
||||||
readBuffer->trimStart(offset);
|
readBuffer->trimStart(offset);
|
||||||
DCHECK_EQ(readBuffer->length(), remaining);
|
DCHECK_EQ(readBuffer->length(), remaining);
|
||||||
remaining = 0;
|
remaining = 0;
|
||||||
networkData.packets.emplace_back(std::move(readBuffer));
|
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
networkData.packets.emplace_back(std::move(readBuffer));
|
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
|
||||||
}
|
}
|
||||||
trackDatagramReceived(bytesRead);
|
trackDatagramReceived(bytesRead);
|
||||||
}
|
}
|
||||||
@@ -1382,9 +1382,9 @@ void QuicClientTransport::recvMmsg(
|
|||||||
size_t len = bytesRead;
|
size_t len = bytesRead;
|
||||||
size_t remaining = len;
|
size_t remaining = len;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
size_t totalNumPackets =
|
size_t totalNumPackets = networkData.getPackets().size() +
|
||||||
networkData.packets.size() + ((len + params.gro - 1) / params.gro);
|
((len + params.gro - 1) / params.gro);
|
||||||
networkData.packets.reserve(totalNumPackets);
|
networkData.reserve(totalNumPackets);
|
||||||
while (remaining) {
|
while (remaining) {
|
||||||
if (static_cast<int>(remaining) > params.gro) {
|
if (static_cast<int>(remaining) > params.gro) {
|
||||||
auto tmp = readBuffer->cloneOne();
|
auto tmp = readBuffer->cloneOne();
|
||||||
@@ -1397,18 +1397,18 @@ void QuicClientTransport::recvMmsg(
|
|||||||
|
|
||||||
offset += params.gro;
|
offset += params.gro;
|
||||||
remaining -= params.gro;
|
remaining -= params.gro;
|
||||||
networkData.packets.emplace_back(std::move(tmp));
|
networkData.addPacket(ReceivedPacket(std::move(tmp)));
|
||||||
} else {
|
} else {
|
||||||
// do not clone the last packet
|
// do not clone the last packet
|
||||||
// start at offset, use all the remaining data
|
// start at offset, use all the remaining data
|
||||||
readBuffer->trimStart(offset);
|
readBuffer->trimStart(offset);
|
||||||
DCHECK_EQ(readBuffer->length(), remaining);
|
DCHECK_EQ(readBuffer->length(), remaining);
|
||||||
remaining = 0;
|
remaining = 0;
|
||||||
networkData.packets.emplace_back(std::move(readBuffer));
|
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
networkData.packets.emplace_back(std::move(readBuffer));
|
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
|
||||||
}
|
}
|
||||||
|
|
||||||
trackDatagramReceived(bytesRead);
|
trackDatagramReceived(bytesRead);
|
||||||
@@ -1423,7 +1423,7 @@ void QuicClientTransport::onNotifyDataAvailable(
|
|||||||
const uint16_t numPackets = conn_->transportSettings.maxRecvBatchSize;
|
const uint16_t numPackets = conn_->transportSettings.maxRecvBatchSize;
|
||||||
|
|
||||||
NetworkData networkData;
|
NetworkData networkData;
|
||||||
networkData.packets.reserve(numPackets);
|
networkData.reserve(numPackets);
|
||||||
size_t totalData = 0;
|
size_t totalData = 0;
|
||||||
folly::Optional<folly::SocketAddress> server;
|
folly::Optional<folly::SocketAddress> server;
|
||||||
|
|
||||||
@@ -1432,7 +1432,7 @@ void QuicClientTransport::onNotifyDataAvailable(
|
|||||||
readBufferSize, numPackets, networkData, server, totalData);
|
readBufferSize, numPackets, networkData, server, totalData);
|
||||||
|
|
||||||
// track the received packets
|
// track the received packets
|
||||||
for (const auto& packet : networkData.packets) {
|
for (const auto& packet : networkData.getPackets()) {
|
||||||
if (!packet.buf) {
|
if (!packet.buf) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -1479,7 +1479,7 @@ void QuicClientTransport::onNotifyDataAvailable(
|
|||||||
recvMsg(sock, readBufferSize, numPackets, networkData, server, totalData);
|
recvMsg(sock, readBufferSize, numPackets, networkData, server, totalData);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (networkData.packets.empty()) {
|
if (networkData.getPackets().empty()) {
|
||||||
// recvMmsg and recvMsg might have already set the reason and counter
|
// recvMmsg and recvMsg might have already set the reason and counter
|
||||||
if (conn_->loopDetectorCallback) {
|
if (conn_->loopDetectorCallback) {
|
||||||
if (conn_->readDebugState.noReadReason == NoReadReason::READ_OK) {
|
if (conn_->readDebugState.noReadReason == NoReadReason::READ_OK) {
|
||||||
@@ -1497,8 +1497,8 @@ void QuicClientTransport::onNotifyDataAvailable(
|
|||||||
// TODO: we can get better receive time accuracy than this, with
|
// TODO: we can get better receive time accuracy than this, with
|
||||||
// SO_TIMESTAMP or SIOCGSTAMP.
|
// SO_TIMESTAMP or SIOCGSTAMP.
|
||||||
auto packetReceiveTime = Clock::now();
|
auto packetReceiveTime = Clock::now();
|
||||||
networkData.receiveTimePoint = packetReceiveTime;
|
networkData.setReceiveTimePoint(packetReceiveTime);
|
||||||
networkData.totalData = totalData;
|
networkData.setTotalData(totalData);
|
||||||
onNetworkData(*server, std::move(networkData));
|
onNetworkData(*server, std::move(networkData));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,24 +24,20 @@ struct ReceivedPacket {
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct NetworkData {
|
struct NetworkData {
|
||||||
TimePoint receiveTimePoint;
|
|
||||||
std::vector<ReceivedPacket> packets;
|
|
||||||
size_t totalData{0};
|
|
||||||
|
|
||||||
NetworkData() = default;
|
NetworkData() = default;
|
||||||
NetworkData(Buf&& buf, const TimePoint& receiveTime)
|
NetworkData(Buf&& buf, const TimePoint& receiveTime)
|
||||||
: receiveTimePoint(receiveTime) {
|
: receiveTimePoint_(receiveTime) {
|
||||||
if (buf) {
|
if (buf) {
|
||||||
totalData = buf->computeChainDataLength();
|
totalData_ = buf->computeChainDataLength();
|
||||||
packets.emplace_back(std::move(buf));
|
packets_.emplace_back(std::move(buf));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
NetworkData(
|
NetworkData(
|
||||||
std::vector<Buf>&& packetBufs,
|
std::vector<Buf>&& packetBufs,
|
||||||
const TimePoint& receiveTimePointIn)
|
const TimePoint& receiveTimePointIn)
|
||||||
: receiveTimePoint(receiveTimePointIn),
|
: receiveTimePoint_(receiveTimePointIn),
|
||||||
packets([&packetBufs]() {
|
packets_([&packetBufs]() {
|
||||||
std::vector<ReceivedPacket> result;
|
std::vector<ReceivedPacket> result;
|
||||||
result.reserve(packetBufs.size());
|
result.reserve(packetBufs.size());
|
||||||
for (auto& packetBuf : packetBufs) {
|
for (auto& packetBuf : packetBufs) {
|
||||||
@@ -49,17 +45,49 @@ struct NetworkData {
|
|||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}()),
|
}()),
|
||||||
totalData([this]() {
|
totalData_([this]() {
|
||||||
size_t result = 0;
|
size_t result = 0;
|
||||||
for (const auto& packet : packets) {
|
for (const auto& packet : packets_) {
|
||||||
result += packet.buf->computeChainDataLength();
|
result += packet.buf->computeChainDataLength();
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}()) {}
|
}()) {}
|
||||||
|
|
||||||
|
void reserve(size_t size) {
|
||||||
|
packets_.reserve(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addPacket(ReceivedPacket&& packetIn) {
|
||||||
|
packets_.emplace_back(std::move(packetIn));
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] const std::vector<ReceivedPacket>& getPackets() const {
|
||||||
|
return packets_;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<ReceivedPacket> movePackets() && {
|
||||||
|
return std::move(packets_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setReceiveTimePoint(const TimePoint& receiveTimePointIn) {
|
||||||
|
receiveTimePoint_ = receiveTimePointIn;
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] TimePoint getReceiveTimePoint() const {
|
||||||
|
return receiveTimePoint_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setTotalData(const size_t totalDataIn) {
|
||||||
|
totalData_ = totalDataIn;
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] size_t getTotalData() const {
|
||||||
|
return totalData_;
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<folly::IOBuf> moveAllData() && {
|
std::unique_ptr<folly::IOBuf> moveAllData() && {
|
||||||
std::unique_ptr<folly::IOBuf> buf;
|
std::unique_ptr<folly::IOBuf> buf;
|
||||||
for (auto& packet : packets) {
|
for (auto& packet : packets_) {
|
||||||
if (buf) {
|
if (buf) {
|
||||||
buf->prependChain(std::move(packet.buf));
|
buf->prependChain(std::move(packet.buf));
|
||||||
} else {
|
} else {
|
||||||
@@ -68,6 +96,11 @@ struct NetworkData {
|
|||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
TimePoint receiveTimePoint_;
|
||||||
|
std::vector<ReceivedPacket> packets_;
|
||||||
|
size_t totalData_{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct NetworkDataSingle {
|
struct NetworkDataSingle {
|
||||||
|
|||||||
@@ -142,9 +142,9 @@ QuicAsyncUDPSocketWrapperImpl::recvMmsg(
|
|||||||
size_t len = bytesRead;
|
size_t len = bytesRead;
|
||||||
size_t remaining = len;
|
size_t remaining = len;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
size_t totalNumPackets =
|
size_t totalNumPackets = networkData.getPackets().size() +
|
||||||
networkData.packets.size() + ((len + params.gro - 1) / params.gro);
|
((len + params.gro - 1) / params.gro);
|
||||||
networkData.packets.reserve(totalNumPackets);
|
networkData.reserve(totalNumPackets);
|
||||||
while (remaining) {
|
while (remaining) {
|
||||||
if (static_cast<int>(remaining) > params.gro) {
|
if (static_cast<int>(remaining) > params.gro) {
|
||||||
auto tmp = readBuffer->cloneOne();
|
auto tmp = readBuffer->cloneOne();
|
||||||
@@ -157,18 +157,18 @@ QuicAsyncUDPSocketWrapperImpl::recvMmsg(
|
|||||||
|
|
||||||
offset += params.gro;
|
offset += params.gro;
|
||||||
remaining -= params.gro;
|
remaining -= params.gro;
|
||||||
networkData.packets.emplace_back(std::move(tmp));
|
networkData.addPacket(ReceivedPacket(std::move(tmp)));
|
||||||
} else {
|
} else {
|
||||||
// do not clone the last packet
|
// do not clone the last packet
|
||||||
// start at offset, use all the remaining data
|
// start at offset, use all the remaining data
|
||||||
readBuffer->trimStart(offset);
|
readBuffer->trimStart(offset);
|
||||||
DCHECK_EQ(readBuffer->length(), remaining);
|
DCHECK_EQ(readBuffer->length(), remaining);
|
||||||
remaining = 0;
|
remaining = 0;
|
||||||
networkData.packets.emplace_back(std::move(readBuffer));
|
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
networkData.packets.emplace_back(std::move(readBuffer));
|
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -720,7 +720,7 @@ class QuicClientTransportTestBase : public virtual testing::Test {
|
|||||||
NetworkData&& data,
|
NetworkData&& data,
|
||||||
bool writes = true,
|
bool writes = true,
|
||||||
folly::SocketAddress* peer = nullptr) {
|
folly::SocketAddress* peer = nullptr) {
|
||||||
for (const auto& packet : data.packets) {
|
for (const auto& packet : data.getPackets()) {
|
||||||
deliverDataWithoutErrorCheck(
|
deliverDataWithoutErrorCheck(
|
||||||
peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes);
|
peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes);
|
||||||
}
|
}
|
||||||
@@ -756,7 +756,7 @@ class QuicClientTransportTestBase : public virtual testing::Test {
|
|||||||
NetworkData&& data,
|
NetworkData&& data,
|
||||||
bool writes = true,
|
bool writes = true,
|
||||||
folly::SocketAddress* peer = nullptr) {
|
folly::SocketAddress* peer = nullptr) {
|
||||||
for (const auto& packet : data.packets) {
|
for (const auto& packet : data.getPackets()) {
|
||||||
deliverData(
|
deliverData(
|
||||||
peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes);
|
peer == nullptr ? serverAddr : *peer, packet.buf->coalesce(), writes);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -590,7 +590,7 @@ void QuicServerWorker::forwardNetworkData(
|
|||||||
"Forwarding packet with unknown connId version from client={} to another process, routingInfo={}",
|
"Forwarding packet with unknown connId version from client={} to another process, routingInfo={}",
|
||||||
client.describe(),
|
client.describe(),
|
||||||
logRoutingInfo(routingData.destinationConnId));
|
logRoutingInfo(routingData.destinationConnId));
|
||||||
auto recvTime = networkData.receiveTimePoint;
|
auto recvTime = networkData.getReceiveTimePoint();
|
||||||
takeoverPktHandler_.forwardPacketToAnotherServer(
|
takeoverPktHandler_.forwardPacketToAnotherServer(
|
||||||
client, std::move(networkData).moveAllData(), recvTime);
|
client, std::move(networkData).moveAllData(), recvTime);
|
||||||
QUIC_STATS(statsCallback_, onPacketForwarded);
|
QUIC_STATS(statsCallback_, onPacketForwarded);
|
||||||
@@ -795,7 +795,7 @@ void QuicServerWorker::dispatchPacketData(
|
|||||||
"Forwarding packet from client={} to another process, routingInfo={}",
|
"Forwarding packet from client={} to another process, routingInfo={}",
|
||||||
client.describe(),
|
client.describe(),
|
||||||
logRoutingInfo(dstConnId));
|
logRoutingInfo(dstConnId));
|
||||||
auto recvTime = networkData.receiveTimePoint;
|
auto recvTime = networkData.getReceiveTimePoint();
|
||||||
takeoverPktHandler_.forwardPacketToAnotherServer(
|
takeoverPktHandler_.forwardPacketToAnotherServer(
|
||||||
client, std::move(networkData).moveAllData(), recvTime);
|
client, std::move(networkData).moveAllData(), recvTime);
|
||||||
QUIC_STATS(statsCallback_, onPacketForwarded);
|
QUIC_STATS(statsCallback_, onPacketForwarded);
|
||||||
@@ -879,7 +879,7 @@ void QuicServerWorker::dispatchPacketData(
|
|||||||
// This could be a new connection, add it in the map
|
// This could be a new connection, add it in the map
|
||||||
// verify that the initial packet is at least min initial bytes
|
// verify that the initial packet is at least min initial bytes
|
||||||
// to avoid amplification attacks. Also check CID sizes.
|
// to avoid amplification attacks. Also check CID sizes.
|
||||||
if (networkData.totalData < kMinInitialPacketSize ||
|
if (networkData.getTotalData() < kMinInitialPacketSize ||
|
||||||
!isValidConnIdLength(dstConnId)) {
|
!isValidConnIdLength(dstConnId)) {
|
||||||
// Don't even attempt to forward the packet, just drop it.
|
// Don't even attempt to forward the packet, just drop it.
|
||||||
VLOG(3) << "Dropping small initial packet from client=" << client;
|
VLOG(3) << "Dropping small initial packet from client=" << client;
|
||||||
@@ -889,7 +889,7 @@ void QuicServerWorker::dispatchPacketData(
|
|||||||
|
|
||||||
// If there is a token present, decrypt it (could be either a retry
|
// If there is a token present, decrypt it (could be either a retry
|
||||||
// token or a new token)
|
// token or a new token)
|
||||||
folly::io::Cursor cursor(networkData.packets.front().buf.get());
|
folly::io::Cursor cursor(networkData.getPackets().front().buf.get());
|
||||||
auto maybeEncryptedToken = maybeGetEncryptedToken(cursor);
|
auto maybeEncryptedToken = maybeGetEncryptedToken(cursor);
|
||||||
bool hasTokenSecret = transportSettings_.retryTokenSecret.hasValue();
|
bool hasTokenSecret = transportSettings_.retryTokenSecret.hasValue();
|
||||||
|
|
||||||
@@ -914,7 +914,7 @@ void QuicServerWorker::dispatchPacketData(
|
|||||||
// send a retry packet back to the client
|
// send a retry packet back to the client
|
||||||
if (!isValidRetryToken &&
|
if (!isValidRetryToken &&
|
||||||
((newConnRateLimiter_ &&
|
((newConnRateLimiter_ &&
|
||||||
newConnRateLimiter_->check(networkData.receiveTimePoint)) ||
|
newConnRateLimiter_->check(networkData.getReceiveTimePoint())) ||
|
||||||
(unfinishedHandshakeLimitFn_.has_value() &&
|
(unfinishedHandshakeLimitFn_.has_value() &&
|
||||||
globalUnfinishedHandshakes >= (*unfinishedHandshakeLimitFn_)()))) {
|
globalUnfinishedHandshakes >= (*unfinishedHandshakeLimitFn_)()))) {
|
||||||
QUIC_STATS(statsCallback_, onConnectionRateLimited);
|
QUIC_STATS(statsCallback_, onConnectionRateLimited);
|
||||||
@@ -948,7 +948,7 @@ void QuicServerWorker::sendResetPacket(
|
|||||||
// Only send resets in response to short header packets.
|
// Only send resets in response to short header packets.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto packetSize = networkData.totalData;
|
auto packetSize = networkData.getTotalData();
|
||||||
auto resetSize = std::min<uint16_t>(packetSize, kDefaultMaxUDPPayload);
|
auto resetSize = std::min<uint16_t>(packetSize, kDefaultMaxUDPPayload);
|
||||||
// Per the spec, less than 43 we should respond with packet size - 1.
|
// Per the spec, less than 43 we should respond with packet size - 1.
|
||||||
if (packetSize < 43) {
|
if (packetSize < 43) {
|
||||||
|
|||||||
@@ -44,9 +44,9 @@ namespace quic {
|
|||||||
namespace test {
|
namespace test {
|
||||||
|
|
||||||
MATCHER_P(NetworkDataMatches, networkData, "") {
|
MATCHER_P(NetworkDataMatches, networkData, "") {
|
||||||
for (size_t i = 0; i < arg.packets.size(); ++i) {
|
for (size_t i = 0; i < arg.getPackets().size(); ++i) {
|
||||||
folly::IOBufEqualTo eq;
|
folly::IOBufEqualTo eq;
|
||||||
bool equals = eq(*arg.packets[i].buf, networkData);
|
bool equals = eq(*arg.getPackets()[i].buf, networkData);
|
||||||
if (equals) {
|
if (equals) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -1971,8 +1971,8 @@ TEST_F(QuicServerWorkerTakeoverTest, QuicServerTakeoverProcessForwardedPkt) {
|
|||||||
EXPECT_EQ(addr.getPort(), clientAddr.getPort());
|
EXPECT_EQ(addr.getPort(), clientAddr.getPort());
|
||||||
// the original data should be extracted after processing takeover
|
// the original data should be extracted after processing takeover
|
||||||
// protocol related information
|
// protocol related information
|
||||||
EXPECT_EQ(networkData->packets.size(), 1);
|
EXPECT_EQ(networkData->getPackets().size(), 1);
|
||||||
EXPECT_TRUE(eq(*data, *(networkData->packets[0].buf)));
|
EXPECT_TRUE(eq(*data, *(networkData->getPackets()[0].buf)));
|
||||||
EXPECT_TRUE(isForwardedData);
|
EXPECT_TRUE(isForwardedData);
|
||||||
};
|
};
|
||||||
EXPECT_CALL(*takeoverWorkerCb_, routeDataToWorkerLong(_, _, _, _, _))
|
EXPECT_CALL(*takeoverWorkerCb_, routeDataToWorkerLong(_, _, _, _, _))
|
||||||
@@ -2163,9 +2163,9 @@ class QuicServerTest : public Test {
|
|||||||
.WillByDefault(Invoke(
|
.WillByDefault(Invoke(
|
||||||
[&, expected = std::shared_ptr<folly::IOBuf>(data->clone())](
|
[&, expected = std::shared_ptr<folly::IOBuf>(data->clone())](
|
||||||
auto, const auto& networkData) mutable {
|
auto, const auto& networkData) mutable {
|
||||||
EXPECT_GT(networkData.packets.size(), 0);
|
EXPECT_GT(networkData.getPackets().size(), 0);
|
||||||
EXPECT_TRUE(folly::IOBufEqualTo()(
|
EXPECT_TRUE(folly::IOBufEqualTo()(
|
||||||
*networkData.packets[0].buf, *expected));
|
*networkData.getPackets()[0].buf, *expected));
|
||||||
std::unique_lock<std::mutex> lg(m);
|
std::unique_lock<std::mutex> lg(m);
|
||||||
calledOnNetworkData = true;
|
calledOnNetworkData = true;
|
||||||
cv.notify_one();
|
cv.notify_one();
|
||||||
@@ -2324,9 +2324,9 @@ TEST_F(QuicServerTest, RouteDataFromDifferentThread) {
|
|||||||
|
|
||||||
EXPECT_CALL(*transport, onNetworkData(_, _))
|
EXPECT_CALL(*transport, onNetworkData(_, _))
|
||||||
.WillOnce(Invoke([&](auto, const auto& networkData) {
|
.WillOnce(Invoke([&](auto, const auto& networkData) {
|
||||||
EXPECT_GT(networkData.packets.size(), 0);
|
EXPECT_GT(networkData.getPackets().size(), 0);
|
||||||
EXPECT_TRUE(
|
EXPECT_TRUE(folly::IOBufEqualTo()(
|
||||||
folly::IOBufEqualTo()(*networkData.packets[0].buf, *initialData));
|
*networkData.getPackets()[0].buf, *initialData));
|
||||||
}));
|
}));
|
||||||
|
|
||||||
server_->routeDataToWorker(
|
server_->routeDataToWorker(
|
||||||
@@ -2424,9 +2424,9 @@ class QuicServerTakeoverTest : public Test {
|
|||||||
EXPECT_CALL(*transport, onNetworkData(_, _))
|
EXPECT_CALL(*transport, onNetworkData(_, _))
|
||||||
.WillOnce(Invoke(
|
.WillOnce(Invoke(
|
||||||
[&, expected = data.get()](auto, const auto& networkData) {
|
[&, expected = data.get()](auto, const auto& networkData) {
|
||||||
EXPECT_GT(networkData.packets.size(), 0);
|
EXPECT_GT(networkData.getPackets().size(), 0);
|
||||||
EXPECT_TRUE(folly::IOBufEqualTo()(
|
EXPECT_TRUE(folly::IOBufEqualTo()(
|
||||||
*networkData.packets[0].buf, *expected));
|
*networkData.getPackets()[0].buf, *expected));
|
||||||
baton.post();
|
baton.post();
|
||||||
}));
|
}));
|
||||||
return transport;
|
return transport;
|
||||||
@@ -2532,9 +2532,9 @@ class QuicServerTakeoverTest : public Test {
|
|||||||
EXPECT_CALL(*transportCbForOldServer, onNetworkData(_, _))
|
EXPECT_CALL(*transportCbForOldServer, onNetworkData(_, _))
|
||||||
.WillOnce(
|
.WillOnce(
|
||||||
Invoke([&, expected = data.get()](auto, const auto& networkData) {
|
Invoke([&, expected = data.get()](auto, const auto& networkData) {
|
||||||
EXPECT_GT(networkData.packets.size(), 0);
|
EXPECT_GT(networkData.getPackets().size(), 0);
|
||||||
EXPECT_TRUE(folly::IOBufEqualTo()(
|
EXPECT_TRUE(folly::IOBufEqualTo()(
|
||||||
*networkData.packets[0].buf, *expected));
|
*networkData.getPackets()[0].buf, *expected));
|
||||||
b1.post();
|
b1.post();
|
||||||
}));
|
}));
|
||||||
// new quic server receives the packet and forwards it
|
// new quic server receives the packet and forwards it
|
||||||
@@ -2995,9 +2995,9 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) {
|
|||||||
EXPECT_CALL(*transport, onNetworkData(_, _))
|
EXPECT_CALL(*transport, onNetworkData(_, _))
|
||||||
.WillOnce(Invoke(
|
.WillOnce(Invoke(
|
||||||
[&, expected = data.get()](auto, const auto& networkData) {
|
[&, expected = data.get()](auto, const auto& networkData) {
|
||||||
EXPECT_GT(networkData.packets.size(), 0);
|
EXPECT_GT(networkData.getPackets().size(), 0);
|
||||||
EXPECT_TRUE(folly::IOBufEqualTo()(
|
EXPECT_TRUE(folly::IOBufEqualTo()(
|
||||||
*networkData.packets[0].buf, *expected));
|
*networkData.getPackets()[0].buf, *expected));
|
||||||
b.post();
|
b.post();
|
||||||
}));
|
}));
|
||||||
return transport;
|
return transport;
|
||||||
@@ -3038,9 +3038,9 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) {
|
|||||||
folly::Baton<> b1;
|
folly::Baton<> b1;
|
||||||
auto verifyZeroRtt = [&](const folly::SocketAddress& peer,
|
auto verifyZeroRtt = [&](const folly::SocketAddress& peer,
|
||||||
const NetworkData& networkData) noexcept {
|
const NetworkData& networkData) noexcept {
|
||||||
EXPECT_GT(networkData.packets.size(), 0);
|
EXPECT_GT(networkData.getPackets().size(), 0);
|
||||||
EXPECT_EQ(peer, reader->getSocket().address());
|
EXPECT_EQ(peer, reader->getSocket().address());
|
||||||
EXPECT_TRUE(folly::IOBufEqualTo()(*data, *networkData.packets[0].buf));
|
EXPECT_TRUE(folly::IOBufEqualTo()(*data, *networkData.getPackets()[0].buf));
|
||||||
b1.post();
|
b1.post();
|
||||||
};
|
};
|
||||||
EXPECT_CALL(*transport, onNetworkData(_, _)).WillOnce(Invoke(verifyZeroRtt));
|
EXPECT_CALL(*transport, onNetworkData(_, _)).WillOnce(Invoke(verifyZeroRtt));
|
||||||
@@ -3091,7 +3091,7 @@ TEST_F(QuicServerTest, ZeroRttBeforeInitial) {
|
|||||||
EXPECT_CALL(*transport, onNetworkData(_, _))
|
EXPECT_CALL(*transport, onNetworkData(_, _))
|
||||||
.Times(2)
|
.Times(2)
|
||||||
.WillRepeatedly(Invoke([&](auto, auto& networkData) {
|
.WillRepeatedly(Invoke([&](auto, auto& networkData) {
|
||||||
for (auto& packet : networkData.packets) {
|
for (const auto& packet : networkData.getPackets()) {
|
||||||
receivedData.emplace_back(packet.buf->clone());
|
receivedData.emplace_back(packet.buf->clone());
|
||||||
}
|
}
|
||||||
if (receivedData.size() == 2) {
|
if (receivedData.size() == 2) {
|
||||||
|
|||||||
Reference in New Issue
Block a user