1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-24 04:01:07 +03:00
Files
mvfst/quic/state/QuicStateFunctions.cpp
Brandon Schlinker d9f0575c62 Cleanup and modularize receive path, improve timestamp support [19/x]
Summary:
This diff renames `updateLargestReceivedPacketNum` to `addPacketToAckState`, and changes the function signature so that it takes the structure of `ReceivedPacket::Timings` instead of just a single receive timestamp. This allows us to plumb through steady clock and socket timestamps.

The current caller to `updateLargestReceivedPacketNum` had a `ReceivedPacket::Timings` and just passed the`receiveTimePoint` field from that structure to to `updateLargestReceivedPacketNum`

```
uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum(
    conn, ackState, packetNum, readData.udpPacket.timings.receiveTimePoint);
```

Now, we just pass the entire `ReceivedPacket::Timings` structure and extract `receiveTimePoint` inside:

```
uint64_t distanceFromExpectedPacketNum = addPacketToAckState(
    conn, ackState, packetNum, readData.udpPacket.timings);
```

--

This diff is part of a larger stack focused on the following:

- **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance:
  - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests.
  - The client currently has three receive paths, and none of them are well tested.

- **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers.
  - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples).
  - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives.
  - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer.

- **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps.

Differential Revision: D48785086

fbshipit-source-id: 48a424e3e27918a8efe41918e0bcfa57337d9337
2023-11-28 07:47:57 -08:00

471 lines
17 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/state/QuicStateFunctions.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/common/TimeUtil.h>
namespace {
std::deque<quic::OutstandingPacketWrapper>::reverse_iterator
getPreviousOutstandingPacket(
quic::QuicConnectionStateBase& conn,
quic::PacketNumberSpace packetNumberSpace,
std::deque<quic::OutstandingPacketWrapper>::reverse_iterator from) {
return std::find_if(
from, conn.outstandings.packets.rend(), [=](const auto& op) {
return !op.declaredLost &&
packetNumberSpace == op.packet.header.getPacketNumberSpace();
});
}
std::deque<quic::OutstandingPacketWrapper>::reverse_iterator
getPreviousOutstandingPacketIncludingLost(
quic::QuicConnectionStateBase& conn,
quic::PacketNumberSpace packetNumberSpace,
std::deque<quic::OutstandingPacketWrapper>::reverse_iterator from) {
return std::find_if(
from, conn.outstandings.packets.rend(), [=](const auto& op) {
return packetNumberSpace == op.packet.header.getPacketNumberSpace();
});
}
} // namespace
namespace quic {
void updateRtt(
QuicConnectionStateBase& conn,
const std::chrono::microseconds rttSample,
const std::chrono::microseconds ackDelay) {
// update mrtt
//
// mrtt ignores ack delay. This is the same in the current recovery draft
// section A.6.
conn.lossState.mrtt = timeMin(conn.lossState.mrtt, rttSample);
// update mrttNoAckDelay
//
// keep a version of mrtt formed from rtt samples with ACK delay removed
if (rttSample >= ackDelay) {
const auto rttSampleNoAckDelay =
std::chrono::ceil<std::chrono::microseconds>(rttSample - ackDelay);
conn.lossState.maybeMrttNoAckDelay = (conn.lossState.maybeMrttNoAckDelay)
? std::min(*conn.lossState.maybeMrttNoAckDelay, rttSampleNoAckDelay)
: rttSampleNoAckDelay;
}
// update lrtt and lrttAckDelay
conn.lossState.lrtt = rttSample;
conn.lossState.maybeLrtt = rttSample;
conn.lossState.maybeLrttAckDelay = ackDelay;
// update maxAckDelay
conn.lossState.maxAckDelay = timeMax(conn.lossState.maxAckDelay, ackDelay);
// determine the adjusted RTT sample we will use for srtt calculations
//
// do NOT subtract the acknowledgment delay from the RTT sample if the
// resulting value is smaller than the min_rtt; this limits underestimation
// of the smoothed_rtt due to a misreporting peer.
//
// if this is the first RTT sample, then it is also the minRTT and ACK delay
// will not be subtracted
const auto adjustedRtt =
((rttSample > ackDelay) && (rttSample > conn.lossState.mrtt + ackDelay))
? rttSample - ackDelay
: rttSample;
if (conn.lossState.srtt == 0us) {
conn.lossState.srtt = adjustedRtt;
conn.lossState.rttvar = adjustedRtt / 2;
} else {
conn.lossState.rttvar = conn.lossState.rttvar * (kRttBeta - 1) / kRttBeta +
(conn.lossState.srtt > adjustedRtt
? conn.lossState.srtt - adjustedRtt
: adjustedRtt - conn.lossState.srtt) /
kRttBeta;
conn.lossState.srtt = conn.lossState.srtt * (kRttAlpha - 1) / kRttAlpha +
adjustedRtt / kRttAlpha;
}
// inform qlog
if (conn.qLogger) {
conn.qLogger->addMetricUpdate(
rttSample, conn.lossState.mrtt, conn.lossState.srtt, ackDelay);
}
}
void updateAckSendStateOnRecvPacket(
QuicConnectionStateBase& conn,
AckState& ackState,
uint64_t distanceFromExpectedPacketNum,
bool pktHasRetransmittableData,
bool pktHasCryptoData,
bool initPktNumSpace) {
DCHECK(!pktHasCryptoData || pktHasRetransmittableData);
auto thresh = kNonRtxRxPacketsPendingBeforeAck;
if (pktHasRetransmittableData || ackState.numRxPacketsRecvd) {
if (ackState.tolerance.hasValue()) {
thresh = ackState.tolerance.value();
} else {
thresh = ackState.largestRecvdPacketNum.value_or(0) >
conn.transportSettings.rxPacketsBeforeAckInitThreshold
? conn.transportSettings.rxPacketsBeforeAckAfterInit
: conn.transportSettings.rxPacketsBeforeAckBeforeInit;
}
}
bool exceedsReorderThreshold =
distanceFromExpectedPacketNum > ackState.reorderThreshold;
if (pktHasRetransmittableData) {
bool skipCryptoAck =
conn.nodeType == QuicNodeType::Server && initPktNumSpace;
if ((pktHasCryptoData && !skipCryptoAck) || exceedsReorderThreshold ||
++ackState.numRxPacketsRecvd + ackState.numNonRxPacketsRecvd >=
thresh) {
VLOG(10) << conn
<< " ack immediately because packet threshold pktHasCryptoData="
<< pktHasCryptoData << " pktHasRetransmittableData="
<< static_cast<int>(pktHasRetransmittableData)
<< " numRxPacketsRecvd="
<< static_cast<int>(ackState.numRxPacketsRecvd)
<< " numNonRxPacketsRecvd="
<< static_cast<int>(ackState.numNonRxPacketsRecvd);
conn.pendingEvents.scheduleAckTimeout = false;
ackState.needsToSendAckImmediately = true;
} else if (!ackState.needsToSendAckImmediately) {
VLOG(10) << conn << " scheduling ack timeout pktHasCryptoData="
<< pktHasCryptoData << " pktHasRetransmittableData="
<< static_cast<int>(pktHasRetransmittableData)
<< " numRxPacketsRecvd="
<< static_cast<int>(ackState.numRxPacketsRecvd)
<< " numNonRxPacketsRecvd="
<< static_cast<int>(ackState.numNonRxPacketsRecvd);
conn.pendingEvents.scheduleAckTimeout = true;
}
} else if (
++ackState.numNonRxPacketsRecvd + ackState.numRxPacketsRecvd >= thresh) {
VLOG(10)
<< conn
<< " ack immediately because exceeds nonrx threshold numNonRxPacketsRecvd="
<< static_cast<int>(ackState.numNonRxPacketsRecvd)
<< " numRxPacketsRecvd="
<< static_cast<int>(ackState.numRxPacketsRecvd);
conn.pendingEvents.scheduleAckTimeout = false;
ackState.needsToSendAckImmediately = true;
}
if (ackState.needsToSendAckImmediately) {
ackState.numRxPacketsRecvd = 0;
ackState.numNonRxPacketsRecvd = 0;
}
}
void updateAckStateOnAckTimeout(QuicConnectionStateBase& conn) {
VLOG(10) << conn << " ack immediately due to ack timeout";
conn.ackStates.appDataAckState.needsToSendAckImmediately = true;
conn.ackStates.appDataAckState.numRxPacketsRecvd = 0;
conn.ackStates.appDataAckState.numNonRxPacketsRecvd = 0;
conn.pendingEvents.scheduleAckTimeout = false;
}
void updateAckSendStateOnSentPacketWithAcks(
QuicConnectionStateBase& conn,
AckState& ackState,
PacketNum largestAckScheduled) {
VLOG(10) << conn << " unset ack immediately due to sending packet with acks";
conn.pendingEvents.scheduleAckTimeout = false;
ackState.needsToSendAckImmediately = false;
// When we send an ack we're most likely going to ack the largest received
// packet, so reset the counters for numRxPacketsRecvd and
// numNonRxPacketsRecvd. Since our ack threshold is quite small, we make the
// critical assumption here that that all the needed acks can fit into one
// packet if needed. If this is not the case, then some packets may not get
// acked as a result and the receiver might retransmit them.
ackState.numRxPacketsRecvd = 0;
ackState.numNonRxPacketsRecvd = 0;
ackState.largestAckScheduled = largestAckScheduled;
}
bool isConnectionPaced(const QuicConnectionStateBase& conn) noexcept {
return (
conn.transportSettings.pacingEnabled && conn.canBePaced && conn.pacer);
}
AckState& getAckState(
QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace) noexcept {
switch (pnSpace) {
case PacketNumberSpace::Initial:
return *CHECK_NOTNULL(conn.ackStates.initialAckState.get());
case PacketNumberSpace::Handshake:
return *CHECK_NOTNULL(conn.ackStates.handshakeAckState.get());
case PacketNumberSpace::AppData:
return conn.ackStates.appDataAckState;
}
folly::assume_unreachable();
}
const AckState& getAckState(
const QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace) noexcept {
switch (pnSpace) {
case PacketNumberSpace::Initial:
return *CHECK_NOTNULL(conn.ackStates.initialAckState.get());
case PacketNumberSpace::Handshake:
return *CHECK_NOTNULL(conn.ackStates.handshakeAckState.get());
case PacketNumberSpace::AppData:
return conn.ackStates.appDataAckState;
}
folly::assume_unreachable();
}
const AckState* getAckStatePtr(
const QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace) noexcept {
switch (pnSpace) {
case PacketNumberSpace::Initial:
return conn.ackStates.initialAckState.get();
case PacketNumberSpace::Handshake:
return conn.ackStates.handshakeAckState.get();
case PacketNumberSpace::AppData:
return &conn.ackStates.appDataAckState;
}
folly::assume_unreachable();
}
AckStateVersion currentAckStateVersion(
const QuicConnectionStateBase& conn) noexcept {
AckStateVersion ret;
if (conn.ackStates.initialAckState) {
ret.initialAckStateVersion =
conn.ackStates.initialAckState->acks.insertVersion();
}
if (conn.ackStates.handshakeAckState) {
ret.handshakeAckStateVersion =
conn.ackStates.handshakeAckState->acks.insertVersion();
}
ret.appDataAckStateVersion =
conn.ackStates.appDataAckState.acks.insertVersion();
return ret;
}
PacketNum getNextPacketNum(
const QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace) noexcept {
return getAckState(conn, pnSpace).nextPacketNum;
}
void increaseNextPacketNum(
QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace,
bool dsrPacket) noexcept {
auto& ackState = getAckState(conn, pnSpace);
ackState.nextPacketNum++;
if (!dsrPacket) {
ackState.nonDsrPacketSequenceNumber++;
}
if (ackState.nextPacketNum == kMaxPacketNumber) {
conn.pendingEvents.closeTransport = true;
}
}
std::deque<OutstandingPacketWrapper>::iterator getFirstOutstandingPacket(
QuicConnectionStateBase& conn,
PacketNumberSpace packetNumberSpace) {
return getNextOutstandingPacket(
conn, packetNumberSpace, conn.outstandings.packets.begin());
}
std::deque<OutstandingPacketWrapper>::reverse_iterator getLastOutstandingPacket(
QuicConnectionStateBase& conn,
PacketNumberSpace packetNumberSpace) {
return getPreviousOutstandingPacket(
conn, packetNumberSpace, conn.outstandings.packets.rbegin());
}
std::deque<OutstandingPacketWrapper>::reverse_iterator
getLastOutstandingPacketIncludingLost(
QuicConnectionStateBase& conn,
PacketNumberSpace packetNumberSpace) {
return getPreviousOutstandingPacketIncludingLost(
conn, packetNumberSpace, conn.outstandings.packets.rbegin());
}
std::deque<OutstandingPacketWrapper>::iterator getNextOutstandingPacket(
QuicConnectionStateBase& conn,
PacketNumberSpace packetNumberSpace,
std::deque<OutstandingPacketWrapper>::iterator from) {
return std::find_if(
from, conn.outstandings.packets.end(), [=](const auto& op) {
return !op.declaredLost &&
packetNumberSpace == op.packet.header.getPacketNumberSpace();
});
}
bool hasReceivedPacketsAtLastCloseSent(
const QuicConnectionStateBase& conn) noexcept {
const auto* initialAckState = conn.ackStates.initialAckState.get();
const auto* handshakeAckState = conn.ackStates.handshakeAckState.get();
const auto& appDataAckState = conn.ackStates.appDataAckState;
return (initialAckState && initialAckState->largestReceivedAtLastCloseSent) ||
(handshakeAckState &&
handshakeAckState->largestReceivedAtLastCloseSent) ||
appDataAckState.largestReceivedAtLastCloseSent;
}
bool hasNotReceivedNewPacketsSinceLastCloseSent(
const QuicConnectionStateBase& conn) noexcept {
const auto* initialAckState = conn.ackStates.initialAckState.get();
const auto* handshakeAckState = conn.ackStates.handshakeAckState.get();
const auto& appDataAckState = conn.ackStates.appDataAckState;
return (initialAckState ? initialAckState->largestReceivedAtLastCloseSent ==
initialAckState->largestRecvdPacketNum
: true) &&
(handshakeAckState ? handshakeAckState->largestReceivedAtLastCloseSent ==
handshakeAckState->largestRecvdPacketNum
: true) &&
appDataAckState.largestReceivedAtLastCloseSent ==
appDataAckState.largestRecvdPacketNum;
}
void updateLargestReceivedPacketsAtLastCloseSent(
QuicConnectionStateBase& conn) noexcept {
auto* initialAckState = conn.ackStates.initialAckState.get();
auto* handshakeAckState = conn.ackStates.handshakeAckState.get();
auto& appDataAckState = conn.ackStates.appDataAckState;
if (initialAckState) {
initialAckState->largestReceivedAtLastCloseSent =
conn.ackStates.initialAckState->largestRecvdPacketNum;
}
if (handshakeAckState) {
handshakeAckState->largestReceivedAtLastCloseSent =
handshakeAckState->largestRecvdPacketNum;
}
appDataAckState.largestReceivedAtLastCloseSent =
conn.ackStates.appDataAckState.largestRecvdPacketNum;
}
bool hasReceivedPackets(const QuicConnectionStateBase& conn) noexcept {
const auto* initialAckState = conn.ackStates.initialAckState.get();
const auto* handshakeAckState = conn.ackStates.handshakeAckState.get();
const auto& appDataAckState = conn.ackStates.appDataAckState;
return (initialAckState ? initialAckState->largestRecvdPacketNum : true) ||
(handshakeAckState ? handshakeAckState->largestRecvdPacketNum : true) ||
appDataAckState.largestRecvdPacketNum;
}
folly::Optional<TimePoint>& getLossTime(
QuicConnectionStateBase& conn,
PacketNumberSpace pnSpace) noexcept {
return conn.lossState.lossTimes[pnSpace];
}
bool canSetLossTimerForAppData(const QuicConnectionStateBase& conn) noexcept {
return conn.oneRttWriteCipher != nullptr;
}
std::pair<folly::Optional<TimePoint>, PacketNumberSpace> earliestLossTimer(
const QuicConnectionStateBase& conn) noexcept {
bool considerAppData = canSetLossTimerForAppData(conn);
return earliestTimeAndSpace(conn.lossState.lossTimes, considerAppData);
}
std::pair<folly::Optional<TimePoint>, PacketNumberSpace> earliestTimeAndSpace(
const EnumArray<PacketNumberSpace, folly::Optional<TimePoint>>& times,
bool considerAppData) noexcept {
std::pair<folly::Optional<TimePoint>, PacketNumberSpace> res = {
folly::none, PacketNumberSpace::Initial};
for (PacketNumberSpace pns : times.keys()) {
if (!times[pns]) {
continue;
}
if (pns == PacketNumberSpace::AppData && !considerAppData) {
continue;
}
if (!res.first || *res.first > *times[pns]) {
res.first = times[pns];
res.second = pns;
}
}
return res;
}
uint64_t maximumConnectionIdsToIssue(const QuicConnectionStateBase& conn) {
// Return a min of what peer supports and hardcoded max limit.
const uint64_t maximumIdsToIssue =
std::min(conn.peerActiveConnectionIdLimit, kMaxActiveConnectionIdLimit);
return maximumIdsToIssue;
}
uint64_t addPacketToAckState(
QuicConnectionStateBase& conn,
AckState& ackState,
const PacketNum packetNum,
const ReceivedPacket::Timings& timings) {
PacketNum expectedNextPacket = 0;
if (ackState.largestRecvdPacketNum) {
expectedNextPacket = *ackState.largestRecvdPacketNum + 1;
}
ackState.largestRecvdPacketNum = std::max<PacketNum>(
ackState.largestRecvdPacketNum.value_or(packetNum), packetNum);
auto preInsertVersion = ackState.acks.insertVersion();
ackState.acks.insert(packetNum);
if (preInsertVersion == ackState.acks.insertVersion()) {
QUIC_STATS(conn.statsCallback, onDuplicatedPacketReceived);
}
if (ackState.largestRecvdPacketNum == packetNum) {
ackState.largestRecvdPacketTime = timings.receiveTimePoint;
}
static_assert(Clock::is_steady, "Needs steady clock");
ackState.lastRecvdPacketInfo.assign({packetNum, timings.receiveTimePoint});
if (packetNum >= expectedNextPacket) {
if (ackState.recvdPacketInfos.size() ==
conn.transportSettings.maxReceiveTimestampsPerAckStored) {
ackState.recvdPacketInfos.pop_front();
}
ackState.recvdPacketInfos.emplace_back(
RecvdPacketInfo{packetNum, timings.receiveTimePoint});
}
if (expectedNextPacket) {
return (packetNum > expectedNextPacket) ? packetNum - expectedNextPacket
: expectedNextPacket - packetNum;
} else {
return 0;
}
}
bool checkCustomRetransmissionProfilesEnabled(
const QuicConnectionStateBase& conn) {
return conn.transportSettings.advertisedMaxStreamGroups > 0;
}
/**
* Checks if the retransmission policy on the stream group prohibits
* retransmissions.
*/
bool streamRetransmissionDisabled(
QuicConnectionStateBase& conn,
const QuicStreamState& stream) {
bool noRetransmissions = false;
if (checkCustomRetransmissionProfilesEnabled(conn) && stream.groupId) {
// Check stream group retransmission policy.
const auto it = conn.retransmissionPolicies.find(*stream.groupId);
if (it != conn.retransmissionPolicies.cend()) {
const auto& retransmissionPolicy = it->second;
noRetransmissions = retransmissionPolicy.disableRetransmission;
}
}
return noRetransmissions;
}
} // namespace quic