diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 41889cddb..dff1671a3 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -3160,9 +3160,10 @@ void QuicTransportBase::writeSocketData() { conn_->outstandings.numOutstanding(); // if we're starting to write from app limited, notify observers - if (conn_->waitingForAppData && conn_->congestionController) { + if (conn_->appLimitedTracker.isAppLimited() && + conn_->congestionController) { + conn_->appLimitedTracker.setNotAppLimited(); notifyStartWritingFromAppRateLimited(); - conn_->waitingForAppData = false; } writeData(); if (closeState_ != CloseState::CLOSED) { @@ -3241,8 +3242,8 @@ void QuicTransportBase::writeSocketData() { if (transportReadyNotified_ && connCallback_) { connCallback_->onAppRateLimited(); } + conn_->appLimitedTracker.setAppLimited(); notifyAppRateLimited(); - conn_->waitingForAppData = true; } } } diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 7d455cea7..bd2c33f9f 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -884,7 +884,8 @@ void updateConnection( conn.outstandings.numOutstanding() + 1, conn.lossState, conn.writeCount, - std::move(detailsPerStream)); + std::move(detailsPerStream), + conn.appLimitedTracker.getTotalAppLimitedTime()); if (isD6DProbe) { ++conn.d6d.outstandingProbes; diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index 399d8a560..ce395d83c 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -1520,6 +1520,102 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithBytesStats) { ->lastAckedPacketInfo->totalBytesAcked); } +TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithAppLimitedStats) { + auto conn = createConn(); + + auto stream = conn->streamManager->createNextBidirectionalStream().value(); + auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake); + writeDataToQuicStream( + *stream, folly::IOBuf::copyBuffer("Im gonna cut your hair."), true); + WriteStreamFrame writeStreamFrame(stream->id, 0, 5, false); + packet.packet.frames.push_back(std::move(writeStreamFrame)); + + // connections should start off being app limited + EXPECT_TRUE(conn->appLimitedTracker.isAppLimited()); + + // mark ourselves as not app limited, verify that total app limited time > 0, + // verify that successive calls to getTotalAppLimitedTime yield same value + conn->appLimitedTracker.setNotAppLimited(); + EXPECT_LT(0us, conn->appLimitedTracker.getTotalAppLimitedTime()); + EXPECT_EQ( + conn->appLimitedTracker.getTotalAppLimitedTime(), + conn->appLimitedTracker.getTotalAppLimitedTime()); + + // record the packet as having been sent + updateConnection( + *conn, + folly::none, + packet.packet, + TimePoint(), + 555, + 500, + false /* isDSRPacket */); + + // should have the current app limited time recorded in metadata + EXPECT_EQ( + conn->appLimitedTracker.getTotalAppLimitedTime(), + getFirstOutstandingPacket(*conn, PacketNumberSpace::Handshake) + ->metadata.totalAppLimitedTimeUsecs); +} + +TEST_F( + QuicTransportFunctionsTest, + TestUpdateConnectionWithAppLimitedStats_MultipleTransitions) { + auto conn = createConn(); + + auto stream = conn->streamManager->createNextBidirectionalStream().value(); + auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake); + writeDataToQuicStream( + *stream, folly::IOBuf::copyBuffer("Im gonna cut your hair."), true); + WriteStreamFrame writeStreamFrame(stream->id, 0, 5, false); + packet.packet.frames.push_back(std::move(writeStreamFrame)); + + // connections should start off being app limited + EXPECT_TRUE(conn->appLimitedTracker.isAppLimited()); + { + // successive calls should yield different measurements + const auto time1 = conn->appLimitedTracker.getTotalAppLimitedTime(); + std::this_thread::sleep_for(10ms); + const auto time2 = conn->appLimitedTracker.getTotalAppLimitedTime(); + EXPECT_NE(time1, time2); + } + + // mark ourselves as not app limited, verify that total app limited time > 0, + // verify that successive calls to getTotalAppLimitedTime yield same value + conn->appLimitedTracker.setNotAppLimited(); + EXPECT_LT(0us, conn->appLimitedTracker.getTotalAppLimitedTime()); + EXPECT_EQ( + conn->appLimitedTracker.getTotalAppLimitedTime(), + conn->appLimitedTracker.getTotalAppLimitedTime()); + const auto appLimitedTime1 = conn->appLimitedTracker.getTotalAppLimitedTime(); + + // become app limited for at least 10ms, then repeat the above + conn->appLimitedTracker.setAppLimited(); + std::this_thread::sleep_for(10ms); + conn->appLimitedTracker.setNotAppLimited(); + EXPECT_LE( + appLimitedTime1 + 10ms, conn->appLimitedTracker.getTotalAppLimitedTime()); + EXPECT_EQ( + conn->appLimitedTracker.getTotalAppLimitedTime(), + conn->appLimitedTracker.getTotalAppLimitedTime()); + + // record the packet as having been sent + updateConnection( + *conn, + folly::none, + packet.packet, + TimePoint(), + 555, + 500, + false /* isDSRPacket */); + + // should have the current app limited time recorded in metadata + EXPECT_EQ( + conn->appLimitedTracker.getTotalAppLimitedTime(), + getFirstOutstandingPacket(*conn, PacketNumberSpace::Handshake) + ->metadata.totalAppLimitedTimeUsecs); +} + TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithCloneResult) { auto conn = createConn(); conn->qLogger = std::make_shared(VantagePoint::Client); diff --git a/quic/api/test/QuicTypedTransportTest.cpp b/quic/api/test/QuicTypedTransportTest.cpp index 4f28b82c6..14955c267 100644 --- a/quic/api/test/QuicTypedTransportTest.cpp +++ b/quic/api/test/QuicTypedTransportTest.cpp @@ -736,6 +736,163 @@ TYPED_TEST( this->destroyTransport(); } +/** + * Verify app limited time tracking and annotation. + */ +TYPED_TEST(QuicTypedTransportAfterStartTest, TotalAppLimitedTime) { + // ACK outstanding packets so that we can switch out the congestion control + this->ackAllOutstandingPackets(); + EXPECT_THAT(this->getConn().outstandings.packets, IsEmpty()); + + // install StaticCwndCongestionController + const auto cwndInBytes = 7000; + this->getNonConstConn().congestionController = + std::make_unique( + StaticCwndCongestionController::CwndInBytes(cwndInBytes)); + + // install PacketProcessor + auto mockPacketProcessor = std::make_unique(); + auto rawPacketProcessor = mockPacketProcessor.get(); + this->getNonConstConn().packetProcessors.push_back( + std::move(mockPacketProcessor)); + + auto streamId = this->getTransport()->createBidirectionalStream().value(); + + // write 1700 bytes to stream to generate two packets back to back + // both packets should have the same app limited time + auto firstPacketTotalAppLimitedTimeUsecs = 0us; + { + EXPECT_CALL(*rawPacketProcessor, onPacketSent(_)) + .Times(2) + .WillOnce(Invoke([&](auto outstandingPacket) { + EXPECT_EQ(4, outstandingPacket.metadata.totalPacketsSent); + EXPECT_EQ(1, outstandingPacket.metadata.packetsInflight); + EXPECT_EQ(3, outstandingPacket.metadata.writeCount); + EXPECT_NE(0us, outstandingPacket.metadata.totalAppLimitedTimeUsecs); + firstPacketTotalAppLimitedTimeUsecs = + outstandingPacket.metadata.totalAppLimitedTimeUsecs; + })) + .WillOnce(Invoke([&](auto outstandingPacket) { + EXPECT_EQ(5, outstandingPacket.metadata.totalPacketsSent); + EXPECT_EQ(2, outstandingPacket.metadata.packetsInflight); + EXPECT_EQ(3, outstandingPacket.metadata.writeCount); + EXPECT_EQ( + firstPacketTotalAppLimitedTimeUsecs, + outstandingPacket.metadata.totalAppLimitedTimeUsecs); + })); + + const auto bufLength = 1700; + auto buf = buildRandomInputData(bufLength); + this->getTransport()->writeChain(streamId, std::move(buf), false); + const auto maybeWrittenPackets1 = this->loopForWrites(); + + // should have sent two packets + ASSERT_TRUE(maybeWrittenPackets1.has_value()); + quic::PacketNum firstPacketNum = maybeWrittenPackets1->start; + quic::PacketNum lastPacketNum = maybeWrittenPackets1->end; + EXPECT_EQ(2, lastPacketNum - firstPacketNum + 1); + } + + // now we're going to be application limited for 10ms (or more) + std::this_thread::sleep_for(10ms); + + // write 10000 bytes to stream to generate multiple packets back to back + // not all will be sent at once because our CWND is only 7000 bytes, and we + // already used 1700 bytes+ in previous send + // + // when (eventually) sent, all packets should have + // - the same app limited time + // - app limited time >= 10ms + firstPacketTotalAppLimitedTimeUsecs + auto thirdPacketTotalAppLimitedTimeUsecs = 0us; + { + EXPECT_CALL(*rawPacketProcessor, onPacketSent(_)) + .Times(4) + .WillOnce(Invoke([&](auto outstandingPacket) { + EXPECT_EQ(4, outstandingPacket.metadata.writeCount); + EXPECT_LE( + firstPacketTotalAppLimitedTimeUsecs + 10ms, + outstandingPacket.metadata.totalAppLimitedTimeUsecs); + thirdPacketTotalAppLimitedTimeUsecs = + outstandingPacket.metadata.totalAppLimitedTimeUsecs; + })) + .WillRepeatedly(Invoke([&](auto outstandingPacket) { + EXPECT_EQ( + thirdPacketTotalAppLimitedTimeUsecs, + outstandingPacket.metadata.totalAppLimitedTimeUsecs); + })); + + const auto bufLength = 10000; + auto buf = buildRandomInputData(bufLength); + this->getTransport()->writeChain(streamId, std::move(buf), false); + const auto maybeWrittenPackets = this->loopForWrites(); + + // should have sent five packets + ASSERT_TRUE(maybeWrittenPackets.has_value()); + quic::PacketNum firstPacketNum = maybeWrittenPackets->start; + quic::PacketNum lastPacketNum = maybeWrittenPackets->end; + EXPECT_EQ(4, lastPacketNum - firstPacketNum + 1); + } + + // deliver an ACK for all of the outstanding packets + this->ackAllOutstandingPackets(); + + // finish sending the rest of the packets + // they should have the same app limited time as packet #3 + { + EXPECT_CALL(*rawPacketProcessor, onPacketSent(_)) + .Times(3) + .WillRepeatedly(Invoke([&](auto outstandingPacket) { + EXPECT_EQ( + thirdPacketTotalAppLimitedTimeUsecs, + outstandingPacket.metadata.totalAppLimitedTimeUsecs); + })); + + const auto maybeWrittenPackets = this->loopForWrites(); + ASSERT_TRUE(maybeWrittenPackets.has_value()); + quic::PacketNum firstPacketNum = maybeWrittenPackets->start; + quic::PacketNum lastPacketNum = maybeWrittenPackets->end; + EXPECT_EQ(3, lastPacketNum - firstPacketNum + 1); + } + + // deliver an ACK for all of the outstanding packets + this->ackAllOutstandingPackets(); + + // now we're going to be application limited again for 10ms (or more) + std::this_thread::sleep_for(10ms); + + // finally, write 1700 bytes again, and verify we see a new app limited time + { + auto penultimatePacketTotalAppLimitedTimeUsecs = 0us; + EXPECT_CALL(*rawPacketProcessor, onPacketSent(_)) + .Times(2) + .WillOnce(Invoke([&](auto outstandingPacket) { + EXPECT_LE( + thirdPacketTotalAppLimitedTimeUsecs + 10ms, + outstandingPacket.metadata.totalAppLimitedTimeUsecs); + penultimatePacketTotalAppLimitedTimeUsecs = + outstandingPacket.metadata.totalAppLimitedTimeUsecs; + })) + .WillOnce(Invoke([&](auto outstandingPacket) { + EXPECT_EQ( + penultimatePacketTotalAppLimitedTimeUsecs, + outstandingPacket.metadata.totalAppLimitedTimeUsecs); + })); + + const auto bufLength = 1700; + auto buf = buildRandomInputData(bufLength); + this->getTransport()->writeChain(streamId, std::move(buf), false); + const auto maybeWrittenPackets1 = this->loopForWrites(); + + // should have sent two packets + ASSERT_TRUE(maybeWrittenPackets1.has_value()); + quic::PacketNum firstPacketNum = maybeWrittenPackets1->start; + quic::PacketNum lastPacketNum = maybeWrittenPackets1->end; + EXPECT_EQ(2, lastPacketNum - firstPacketNum + 1); + } + + this->destroyTransport(); +} + TYPED_TEST( QuicTypedTransportAfterStartTest, StreamAckedIntervalsDeliveryCallbacks) { diff --git a/quic/state/OutstandingPacket.h b/quic/state/OutstandingPacket.h index 5cd98f86b..70fbc9f8f 100644 --- a/quic/state/OutstandingPacket.h +++ b/quic/state/OutstandingPacket.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace quic { @@ -104,6 +105,10 @@ struct OutstandingPacketMetadata { // Details about each stream with frames in this packet DetailsPerStream detailsPerStream; + // Total time spent app limited on this connection including when this packet + // was sent. + std::chrono::microseconds totalAppLimitedTimeUsecs{0}; + OutstandingPacketMetadata( TimePoint timeIn, uint32_t encodedSizeIn, @@ -116,7 +121,8 @@ struct OutstandingPacketMetadata { uint64_t packetsInflightIn, const LossState& lossStateIn, uint64_t writeCount, - DetailsPerStream detailsPerStream) + DetailsPerStream detailsPerStream, + std::chrono::microseconds totalAppLimitedTimeUsecsIn = 0us) : time(timeIn), encodedSize(encodedSizeIn), encodedBodySize(encodedBodySizeIn), @@ -129,7 +135,8 @@ struct OutstandingPacketMetadata { totalPacketsSent(lossStateIn.totalPacketsSent), totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent), writeCount(writeCount), - detailsPerStream(std::move(detailsPerStream)) {} + detailsPerStream(std::move(detailsPerStream)), + totalAppLimitedTimeUsecs(totalAppLimitedTimeUsecsIn) {} }; // Data structure to represent outstanding retransmittable packets @@ -206,7 +213,8 @@ struct OutstandingPacket { uint64_t packetsInflightIn, const LossState& lossStateIn, uint64_t writeCount, - Metadata::DetailsPerStream detailsPerStream) + Metadata::DetailsPerStream detailsPerStream, + std::chrono::microseconds totalAppLimitedTimeUsecs = 0us) : packet(std::move(packetIn)), metadata(OutstandingPacketMetadata( timeIn, @@ -220,7 +228,8 @@ struct OutstandingPacket { packetsInflightIn, lossStateIn, writeCount, - std::move(detailsPerStream))) {} + std::move(detailsPerStream), + totalAppLimitedTimeUsecs)) {} OutstandingPacket( RegularQuicWritePacket packetIn, @@ -235,7 +244,8 @@ struct OutstandingPacket { uint64_t packetsInflightIn, const LossState& lossStateIn, uint64_t writeCount, - Metadata::DetailsPerStream detailsPerStream) + Metadata::DetailsPerStream detailsPerStream, + std::chrono::microseconds totalAppLimitedTimeUsecs = 0us) : packet(std::move(packetIn)), metadata(OutstandingPacketMetadata( timeIn, @@ -249,6 +259,7 @@ struct OutstandingPacket { packetsInflightIn, lossStateIn, writeCount, - std::move(detailsPerStream))) {} + std::move(detailsPerStream), + totalAppLimitedTimeUsecs)) {} }; } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 21027829d..d0b99af83 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -142,6 +142,65 @@ struct OutstandingsInfo { } }; +class AppLimitedTracker { + public: + using Clock = std::chrono::steady_clock; + + /** + * Mark the connection as application limited. + */ + void setAppLimited() { + DCHECK(!isAppLimited_); + isAppLimited_ = true; + appLimitedStartTime_ = Clock::now(); + } + + /** + * Mark the connection as not being application limited. + */ + void setNotAppLimited() { + DCHECK(isAppLimited_); + isAppLimited_ = false; + totalAppLimitedTime_ += + std::chrono::duration_cast( + Clock::now() - appLimitedStartTime_); + } + + /** + * Returns whether the connection has been marked as appplication limited. + */ + bool isAppLimited() { + return isAppLimited_; + } + + /** + * Returns total time connection has spent application limited. + * + * If connection is currently application limited, the time in the current + * application limited period is included. + */ + std::chrono::microseconds getTotalAppLimitedTime() { + if (isAppLimited_) { + return std::chrono::duration_cast( + Clock::now() - appLimitedStartTime_) + + totalAppLimitedTime_; + } + return totalAppLimitedTime_; + } + + private: + // Total time spent application limited, excluding now. + std::chrono::microseconds totalAppLimitedTime_{0us}; + + // Whether we're currently application limited. + // Initialize to true since all connections start off application limited. + bool isAppLimited_{true}; + + // When we last became application limited. + // Initialize to now() since all connections start off application limited. + Clock::time_point appLimitedStartTime_{Clock::now()}; +}; + struct Pacer { virtual ~Pacer() = default; @@ -689,10 +748,8 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction { // For example, we may not want to pace a connection that's still handshaking. bool canBePaced{false}; - // Flag indicating whether the socket is currently waiting for the app to - // write data. All new sockets start off in this state where they wait for the - // application to pump data to the socket. - bool waitingForAppData{true}; + // Tracking of application limited time. + AppLimitedTracker appLimitedTracker; // Monotonically increasing counter that is incremented each time there is a // write on this socket (writeSocketData() is called), This is used to diff --git a/quic/state/test/StateDataTest.cpp b/quic/state/test/StateDataTest.cpp index 65eac2a60..d0d491e77 100644 --- a/quic/state/test/StateDataTest.cpp +++ b/quic/state/test/StateDataTest.cpp @@ -89,6 +89,49 @@ TEST_F(StateDataTest, CongestionControllerState) { } } +TEST_F(StateDataTest, AppLimitedTracker) { + AppLimitedTracker tracker; + + // initialized to app limited + EXPECT_TRUE(tracker.isAppLimited()); + + // if app limited, getTotalAppLimitedTime includes current app limited time + { + const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime(); + std::this_thread::sleep_for(10ms); + const auto totalAppLimitedTime2 = tracker.getTotalAppLimitedTime(); + + EXPECT_LE(totalAppLimitedTime1, totalAppLimitedTime2); + EXPECT_GE(totalAppLimitedTime2, totalAppLimitedTime1 + 10ms); + } + + // when we become non-app limited, we properly track time spent app limited + { + const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime(); + tracker.setNotAppLimited(); + EXPECT_LE(totalAppLimitedTime1, tracker.getTotalAppLimitedTime()); + } + + // if we become app limited again, total time is >= existing time + { + const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime(); + tracker.setAppLimited(); + EXPECT_LE(totalAppLimitedTime1, tracker.getTotalAppLimitedTime()); + std::this_thread::sleep_for(10ms); + + const auto totalAppLimitedTime2 = tracker.getTotalAppLimitedTime(); + EXPECT_LE(totalAppLimitedTime1, totalAppLimitedTime2); + EXPECT_GE(totalAppLimitedTime2, totalAppLimitedTime1 + 10ms); + } + + // when we become non-app limited, we properly track time spent app limited + { + const auto totalAppLimitedTime1 = tracker.getTotalAppLimitedTime(); + tracker.setNotAppLimited(); + EXPECT_LE(totalAppLimitedTime1, tracker.getTotalAppLimitedTime()); + } +} + TEST_F(StateDataTest, EmptyLossEvent) { CongestionController::LossEvent loss; EXPECT_EQ(0, loss.lostBytes);