1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Application limited time in packet metadata

Summary:
Track the total (cumulative) amount of time that the connection has been application limited and store this information in `OutstandingPacketMetadata`. If this value is the same for two `OutstandingPacketMetadata` then we know that the transport did not become application limited between when those two packets were sent (or, less likely, the transport was application limited for less than one microsecond given the microsecond resolution of the timestamp).

We store the amount of time spent application limited instead of a count of the number of application limited events because the implications of being application limited are time dependent.

Tests show that we need to be able to inject a mockable clock. That's been an issue for some time; will work on in a subsequent diff.

Differential Revision: D41714879

fbshipit-source-id: 9fd4fe321d85639dc9fb5c2cd51713c481cbeb22
This commit is contained in:
Brandon Schlinker
2022-12-08 18:55:22 -08:00
committed by Facebook GitHub Bot
parent 9ddbed6d5e
commit eb3009484a
7 changed files with 380 additions and 14 deletions

View File

@@ -3160,9 +3160,10 @@ void QuicTransportBase::writeSocketData() {
conn_->outstandings.numOutstanding(); conn_->outstandings.numOutstanding();
// if we're starting to write from app limited, notify observers // if we're starting to write from app limited, notify observers
if (conn_->waitingForAppData && conn_->congestionController) { if (conn_->appLimitedTracker.isAppLimited() &&
conn_->congestionController) {
conn_->appLimitedTracker.setNotAppLimited();
notifyStartWritingFromAppRateLimited(); notifyStartWritingFromAppRateLimited();
conn_->waitingForAppData = false;
} }
writeData(); writeData();
if (closeState_ != CloseState::CLOSED) { if (closeState_ != CloseState::CLOSED) {
@@ -3241,8 +3242,8 @@ void QuicTransportBase::writeSocketData() {
if (transportReadyNotified_ && connCallback_) { if (transportReadyNotified_ && connCallback_) {
connCallback_->onAppRateLimited(); connCallback_->onAppRateLimited();
} }
conn_->appLimitedTracker.setAppLimited();
notifyAppRateLimited(); notifyAppRateLimited();
conn_->waitingForAppData = true;
} }
} }
} }

View File

@@ -884,7 +884,8 @@ void updateConnection(
conn.outstandings.numOutstanding() + 1, conn.outstandings.numOutstanding() + 1,
conn.lossState, conn.lossState,
conn.writeCount, conn.writeCount,
std::move(detailsPerStream)); std::move(detailsPerStream),
conn.appLimitedTracker.getTotalAppLimitedTime());
if (isD6DProbe) { if (isD6DProbe) {
++conn.d6d.outstandingProbes; ++conn.d6d.outstandingProbes;

View File

@@ -1520,6 +1520,102 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithBytesStats) {
->lastAckedPacketInfo->totalBytesAcked); ->lastAckedPacketInfo->totalBytesAcked);
} }
TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithAppLimitedStats) {
auto conn = createConn();
auto stream = conn->streamManager->createNextBidirectionalStream().value();
auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake);
writeDataToQuicStream(
*stream, folly::IOBuf::copyBuffer("Im gonna cut your hair."), true);
WriteStreamFrame writeStreamFrame(stream->id, 0, 5, false);
packet.packet.frames.push_back(std::move(writeStreamFrame));
// connections should start off being app limited
EXPECT_TRUE(conn->appLimitedTracker.isAppLimited());
// mark ourselves as not app limited, verify that total app limited time > 0,
// verify that successive calls to getTotalAppLimitedTime yield same value
conn->appLimitedTracker.setNotAppLimited();
EXPECT_LT(0us, conn->appLimitedTracker.getTotalAppLimitedTime());
EXPECT_EQ(
conn->appLimitedTracker.getTotalAppLimitedTime(),
conn->appLimitedTracker.getTotalAppLimitedTime());
// record the packet as having been sent
updateConnection(
*conn,
folly::none,
packet.packet,
TimePoint(),
555,
500,
false /* isDSRPacket */);
// should have the current app limited time recorded in metadata
EXPECT_EQ(
conn->appLimitedTracker.getTotalAppLimitedTime(),
getFirstOutstandingPacket(*conn, PacketNumberSpace::Handshake)
->metadata.totalAppLimitedTimeUsecs);
}
TEST_F(
QuicTransportFunctionsTest,
TestUpdateConnectionWithAppLimitedStats_MultipleTransitions) {
auto conn = createConn();
auto stream = conn->streamManager->createNextBidirectionalStream().value();
auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake);
writeDataToQuicStream(
*stream, folly::IOBuf::copyBuffer("Im gonna cut your hair."), true);
WriteStreamFrame writeStreamFrame(stream->id, 0, 5, false);
packet.packet.frames.push_back(std::move(writeStreamFrame));
// connections should start off being app limited
EXPECT_TRUE(conn->appLimitedTracker.isAppLimited());
{
// successive calls should yield different measurements
const auto time1 = conn->appLimitedTracker.getTotalAppLimitedTime();
std::this_thread::sleep_for(10ms);
const auto time2 = conn->appLimitedTracker.getTotalAppLimitedTime();
EXPECT_NE(time1, time2);
}
// mark ourselves as not app limited, verify that total app limited time > 0,
// verify that successive calls to getTotalAppLimitedTime yield same value
conn->appLimitedTracker.setNotAppLimited();
EXPECT_LT(0us, conn->appLimitedTracker.getTotalAppLimitedTime());
EXPECT_EQ(
conn->appLimitedTracker.getTotalAppLimitedTime(),
conn->appLimitedTracker.getTotalAppLimitedTime());
const auto appLimitedTime1 = conn->appLimitedTracker.getTotalAppLimitedTime();
// become app limited for at least 10ms, then repeat the above
conn->appLimitedTracker.setAppLimited();
std::this_thread::sleep_for(10ms);
conn->appLimitedTracker.setNotAppLimited();
EXPECT_LE(
appLimitedTime1 + 10ms, conn->appLimitedTracker.getTotalAppLimitedTime());
EXPECT_EQ(
conn->appLimitedTracker.getTotalAppLimitedTime(),
conn->appLimitedTracker.getTotalAppLimitedTime());
// record the packet as having been sent
updateConnection(
*conn,
folly::none,
packet.packet,
TimePoint(),
555,
500,
false /* isDSRPacket */);
// should have the current app limited time recorded in metadata
EXPECT_EQ(
conn->appLimitedTracker.getTotalAppLimitedTime(),
getFirstOutstandingPacket(*conn, PacketNumberSpace::Handshake)
->metadata.totalAppLimitedTimeUsecs);
}
TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithCloneResult) { TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithCloneResult) {
auto conn = createConn(); auto conn = createConn();
conn->qLogger = std::make_shared<quic::FileQLogger>(VantagePoint::Client); conn->qLogger = std::make_shared<quic::FileQLogger>(VantagePoint::Client);

View File

@@ -736,6 +736,163 @@ TYPED_TEST(
this->destroyTransport(); this->destroyTransport();
} }
/**
* Verify app limited time tracking and annotation.
*/
TYPED_TEST(QuicTypedTransportAfterStartTest, TotalAppLimitedTime) {
// ACK outstanding packets so that we can switch out the congestion control
this->ackAllOutstandingPackets();
EXPECT_THAT(this->getConn().outstandings.packets, IsEmpty());
// install StaticCwndCongestionController
const auto cwndInBytes = 7000;
this->getNonConstConn().congestionController =
std::make_unique<StaticCwndCongestionController>(
StaticCwndCongestionController::CwndInBytes(cwndInBytes));
// install PacketProcessor
auto mockPacketProcessor = std::make_unique<MockPacketProcessor>();
auto rawPacketProcessor = mockPacketProcessor.get();
this->getNonConstConn().packetProcessors.push_back(
std::move(mockPacketProcessor));
auto streamId = this->getTransport()->createBidirectionalStream().value();
// write 1700 bytes to stream to generate two packets back to back
// both packets should have the same app limited time
auto firstPacketTotalAppLimitedTimeUsecs = 0us;
{
EXPECT_CALL(*rawPacketProcessor, onPacketSent(_))
.Times(2)
.WillOnce(Invoke([&](auto outstandingPacket) {
EXPECT_EQ(4, outstandingPacket.metadata.totalPacketsSent);
EXPECT_EQ(1, outstandingPacket.metadata.packetsInflight);
EXPECT_EQ(3, outstandingPacket.metadata.writeCount);
EXPECT_NE(0us, outstandingPacket.metadata.totalAppLimitedTimeUsecs);
firstPacketTotalAppLimitedTimeUsecs =
outstandingPacket.metadata.totalAppLimitedTimeUsecs;
}))
.WillOnce(Invoke([&](auto outstandingPacket) {
EXPECT_EQ(5, outstandingPacket.metadata.totalPacketsSent);
EXPECT_EQ(2, outstandingPacket.metadata.packetsInflight);
EXPECT_EQ(3, outstandingPacket.metadata.writeCount);
EXPECT_EQ(
firstPacketTotalAppLimitedTimeUsecs,
outstandingPacket.metadata.totalAppLimitedTimeUsecs);
}));
const auto bufLength = 1700;
auto buf = buildRandomInputData(bufLength);
this->getTransport()->writeChain(streamId, std::move(buf), false);
const auto maybeWrittenPackets1 = this->loopForWrites();
// should have sent two packets
ASSERT_TRUE(maybeWrittenPackets1.has_value());
quic::PacketNum firstPacketNum = maybeWrittenPackets1->start;
quic::PacketNum lastPacketNum = maybeWrittenPackets1->end;
EXPECT_EQ(2, lastPacketNum - firstPacketNum + 1);
}
// now we're going to be application limited for 10ms (or more)
std::this_thread::sleep_for(10ms);
// write 10000 bytes to stream to generate multiple packets back to back
// not all will be sent at once because our CWND is only 7000 bytes, and we
// already used 1700 bytes+ in previous send
//
// when (eventually) sent, all packets should have
// - the same app limited time
// - app limited time >= 10ms + firstPacketTotalAppLimitedTimeUsecs
auto thirdPacketTotalAppLimitedTimeUsecs = 0us;
{
EXPECT_CALL(*rawPacketProcessor, onPacketSent(_))
.Times(4)
.WillOnce(Invoke([&](auto outstandingPacket) {
EXPECT_EQ(4, outstandingPacket.metadata.writeCount);
EXPECT_LE(
firstPacketTotalAppLimitedTimeUsecs + 10ms,
outstandingPacket.metadata.totalAppLimitedTimeUsecs);
thirdPacketTotalAppLimitedTimeUsecs =
outstandingPacket.metadata.totalAppLimitedTimeUsecs;
}))
.WillRepeatedly(Invoke([&](auto outstandingPacket) {
EXPECT_EQ(
thirdPacketTotalAppLimitedTimeUsecs,
outstandingPacket.metadata.totalAppLimitedTimeUsecs);
}));
const auto bufLength = 10000;
auto buf = buildRandomInputData(bufLength);
this->getTransport()->writeChain(streamId, std::move(buf), false);
const auto maybeWrittenPackets = this->loopForWrites();
// should have sent five packets
ASSERT_TRUE(maybeWrittenPackets.has_value());
quic::PacketNum firstPacketNum = maybeWrittenPackets->start;
quic::PacketNum lastPacketNum = maybeWrittenPackets->end;
EXPECT_EQ(4, lastPacketNum - firstPacketNum + 1);
}
// deliver an ACK for all of the outstanding packets
this->ackAllOutstandingPackets();
// finish sending the rest of the packets
// they should have the same app limited time as packet #3
{
EXPECT_CALL(*rawPacketProcessor, onPacketSent(_))
.Times(3)
.WillRepeatedly(Invoke([&](auto outstandingPacket) {
EXPECT_EQ(
thirdPacketTotalAppLimitedTimeUsecs,
outstandingPacket.metadata.totalAppLimitedTimeUsecs);
}));
const auto maybeWrittenPackets = this->loopForWrites();
ASSERT_TRUE(maybeWrittenPackets.has_value());
quic::PacketNum firstPacketNum = maybeWrittenPackets->start;
quic::PacketNum lastPacketNum = maybeWrittenPackets->end;
EXPECT_EQ(3, lastPacketNum - firstPacketNum + 1);
}
// deliver an ACK for all of the outstanding packets
this->ackAllOutstandingPackets();
// now we're going to be application limited again for 10ms (or more)
std::this_thread::sleep_for(10ms);
// finally, write 1700 bytes again, and verify we see a new app limited time
{
auto penultimatePacketTotalAppLimitedTimeUsecs = 0us;
EXPECT_CALL(*rawPacketProcessor, onPacketSent(_))
.Times(2)
.WillOnce(Invoke([&](auto outstandingPacket) {
EXPECT_LE(
thirdPacketTotalAppLimitedTimeUsecs + 10ms,
outstandingPacket.metadata.totalAppLimitedTimeUsecs);
penultimatePacketTotalAppLimitedTimeUsecs =
outstandingPacket.metadata.totalAppLimitedTimeUsecs;
}))
.WillOnce(Invoke([&](auto outstandingPacket) {
EXPECT_EQ(
penultimatePacketTotalAppLimitedTimeUsecs,
outstandingPacket.metadata.totalAppLimitedTimeUsecs);
}));
const auto bufLength = 1700;
auto buf = buildRandomInputData(bufLength);
this->getTransport()->writeChain(streamId, std::move(buf), false);
const auto maybeWrittenPackets1 = this->loopForWrites();
// should have sent two packets
ASSERT_TRUE(maybeWrittenPackets1.has_value());
quic::PacketNum firstPacketNum = maybeWrittenPackets1->start;
quic::PacketNum lastPacketNum = maybeWrittenPackets1->end;
EXPECT_EQ(2, lastPacketNum - firstPacketNum + 1);
}
this->destroyTransport();
}
TYPED_TEST( TYPED_TEST(
QuicTypedTransportAfterStartTest, QuicTypedTransportAfterStartTest,
StreamAckedIntervalsDeliveryCallbacks) { StreamAckedIntervalsDeliveryCallbacks) {

View File

@@ -10,6 +10,7 @@
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/state/LossState.h> #include <quic/state/LossState.h>
#include <quic/state/PacketEvent.h> #include <quic/state/PacketEvent.h>
#include <chrono>
namespace quic { namespace quic {
@@ -104,6 +105,10 @@ struct OutstandingPacketMetadata {
// Details about each stream with frames in this packet // Details about each stream with frames in this packet
DetailsPerStream detailsPerStream; DetailsPerStream detailsPerStream;
// Total time spent app limited on this connection including when this packet
// was sent.
std::chrono::microseconds totalAppLimitedTimeUsecs{0};
OutstandingPacketMetadata( OutstandingPacketMetadata(
TimePoint timeIn, TimePoint timeIn,
uint32_t encodedSizeIn, uint32_t encodedSizeIn,
@@ -116,7 +121,8 @@ struct OutstandingPacketMetadata {
uint64_t packetsInflightIn, uint64_t packetsInflightIn,
const LossState& lossStateIn, const LossState& lossStateIn,
uint64_t writeCount, uint64_t writeCount,
DetailsPerStream detailsPerStream) DetailsPerStream detailsPerStream,
std::chrono::microseconds totalAppLimitedTimeUsecsIn = 0us)
: time(timeIn), : time(timeIn),
encodedSize(encodedSizeIn), encodedSize(encodedSizeIn),
encodedBodySize(encodedBodySizeIn), encodedBodySize(encodedBodySizeIn),
@@ -129,7 +135,8 @@ struct OutstandingPacketMetadata {
totalPacketsSent(lossStateIn.totalPacketsSent), totalPacketsSent(lossStateIn.totalPacketsSent),
totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent), totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent),
writeCount(writeCount), writeCount(writeCount),
detailsPerStream(std::move(detailsPerStream)) {} detailsPerStream(std::move(detailsPerStream)),
totalAppLimitedTimeUsecs(totalAppLimitedTimeUsecsIn) {}
}; };
// Data structure to represent outstanding retransmittable packets // Data structure to represent outstanding retransmittable packets
@@ -206,7 +213,8 @@ struct OutstandingPacket {
uint64_t packetsInflightIn, uint64_t packetsInflightIn,
const LossState& lossStateIn, const LossState& lossStateIn,
uint64_t writeCount, uint64_t writeCount,
Metadata::DetailsPerStream detailsPerStream) Metadata::DetailsPerStream detailsPerStream,
std::chrono::microseconds totalAppLimitedTimeUsecs = 0us)
: packet(std::move(packetIn)), : packet(std::move(packetIn)),
metadata(OutstandingPacketMetadata( metadata(OutstandingPacketMetadata(
timeIn, timeIn,
@@ -220,7 +228,8 @@ struct OutstandingPacket {
packetsInflightIn, packetsInflightIn,
lossStateIn, lossStateIn,
writeCount, writeCount,
std::move(detailsPerStream))) {} std::move(detailsPerStream),
totalAppLimitedTimeUsecs)) {}
OutstandingPacket( OutstandingPacket(
RegularQuicWritePacket packetIn, RegularQuicWritePacket packetIn,
@@ -235,7 +244,8 @@ struct OutstandingPacket {
uint64_t packetsInflightIn, uint64_t packetsInflightIn,
const LossState& lossStateIn, const LossState& lossStateIn,
uint64_t writeCount, uint64_t writeCount,
Metadata::DetailsPerStream detailsPerStream) Metadata::DetailsPerStream detailsPerStream,
std::chrono::microseconds totalAppLimitedTimeUsecs = 0us)
: packet(std::move(packetIn)), : packet(std::move(packetIn)),
metadata(OutstandingPacketMetadata( metadata(OutstandingPacketMetadata(
timeIn, timeIn,
@@ -249,6 +259,7 @@ struct OutstandingPacket {
packetsInflightIn, packetsInflightIn,
lossStateIn, lossStateIn,
writeCount, writeCount,
std::move(detailsPerStream))) {} std::move(detailsPerStream),
totalAppLimitedTimeUsecs)) {}
}; };
} // namespace quic } // namespace quic

View File

@@ -142,6 +142,65 @@ struct OutstandingsInfo {
} }
}; };
class AppLimitedTracker {
public:
using Clock = std::chrono::steady_clock;
/**
* Mark the connection as application limited.
*/
void setAppLimited() {
DCHECK(!isAppLimited_);
isAppLimited_ = true;
appLimitedStartTime_ = Clock::now();
}
/**
* Mark the connection as not being application limited.
*/
void setNotAppLimited() {
DCHECK(isAppLimited_);
isAppLimited_ = false;
totalAppLimitedTime_ +=
std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - appLimitedStartTime_);
}
/**
* Returns whether the connection has been marked as appplication limited.
*/
bool isAppLimited() {
return isAppLimited_;
}
/**
* Returns total time connection has spent application limited.
*
* If connection is currently application limited, the time in the current
* application limited period is included.
*/
std::chrono::microseconds getTotalAppLimitedTime() {
if (isAppLimited_) {
return std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - appLimitedStartTime_) +
totalAppLimitedTime_;
}
return totalAppLimitedTime_;
}
private:
// Total time spent application limited, excluding now.
std::chrono::microseconds totalAppLimitedTime_{0us};
// Whether we're currently application limited.
// Initialize to true since all connections start off application limited.
bool isAppLimited_{true};
// When we last became application limited.
// Initialize to now() since all connections start off application limited.
Clock::time_point appLimitedStartTime_{Clock::now()};
};
struct Pacer { struct Pacer {
virtual ~Pacer() = default; virtual ~Pacer() = default;
@@ -689,10 +748,8 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
// For example, we may not want to pace a connection that's still handshaking. // For example, we may not want to pace a connection that's still handshaking.
bool canBePaced{false}; bool canBePaced{false};
// Flag indicating whether the socket is currently waiting for the app to // Tracking of application limited time.
// write data. All new sockets start off in this state where they wait for the AppLimitedTracker appLimitedTracker;
// application to pump data to the socket.
bool waitingForAppData{true};
// Monotonically increasing counter that is incremented each time there is a // Monotonically increasing counter that is incremented each time there is a
// write on this socket (writeSocketData() is called), This is used to // write on this socket (writeSocketData() is called), This is used to

View File

@@ -89,6 +89,49 @@ TEST_F(StateDataTest, CongestionControllerState) {
} }
} }
TEST_F(StateDataTest, AppLimitedTracker) {
AppLimitedTracker tracker;
// initialized to app limited
EXPECT_TRUE(tracker.isAppLimited());
// if app limited, getTotalAppLimitedTime includes current app limited time
{
const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime();
std::this_thread::sleep_for(10ms);
const auto totalAppLimitedTime2 = tracker.getTotalAppLimitedTime();
EXPECT_LE(totalAppLimitedTime1, totalAppLimitedTime2);
EXPECT_GE(totalAppLimitedTime2, totalAppLimitedTime1 + 10ms);
}
// when we become non-app limited, we properly track time spent app limited
{
const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime();
tracker.setNotAppLimited();
EXPECT_LE(totalAppLimitedTime1, tracker.getTotalAppLimitedTime());
}
// if we become app limited again, total time is >= existing time
{
const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime();
tracker.setAppLimited();
EXPECT_LE(totalAppLimitedTime1, tracker.getTotalAppLimitedTime());
std::this_thread::sleep_for(10ms);
const auto totalAppLimitedTime2 = tracker.getTotalAppLimitedTime();
EXPECT_LE(totalAppLimitedTime1, totalAppLimitedTime2);
EXPECT_GE(totalAppLimitedTime2, totalAppLimitedTime1 + 10ms);
}
// when we become non-app limited, we properly track time spent app limited
{
const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime();
tracker.setNotAppLimited();
EXPECT_LE(totalAppLimitedTime1, tracker.getTotalAppLimitedTime());
}
}
TEST_F(StateDataTest, EmptyLossEvent) { TEST_F(StateDataTest, EmptyLossEvent) {
CongestionController::LossEvent loss; CongestionController::LossEvent loss;
EXPECT_EQ(0, loss.lostBytes); EXPECT_EQ(0, loss.lostBytes);