From d2f005dc00b26461d674247fc84d58f7a6290c53 Mon Sep 17 00:00:00 2001 From: Sridhar Srinivasan Date: Wed, 10 Mar 2021 13:03:30 -0800 Subject: [PATCH] Add support for detecting start and stop of app rate limited scenarios Summary: Previously, we only had support for notifying observers when a QUIC socket became application rate limited. This change adds a similar notification when a socket does not become application rate limited, i.e when the application *starts* writing data to the socket - either for the first time on a newly established socket or after a previous block of writes were written and the app had no more to write. In addition, we include the number of outstanding packets (only those that are carrying app data in them) so that observers can use this data to timestamp the start and end of periods where the socket performs app data writes. Reviewed By: yangchi Differential Revision: D26559598 fbshipit-source-id: 0a8df7082b83e2ffad9b5addceca29cc03897243 --- quic/api/Observer.h | 62 ++++++++++++++++++++++- quic/api/QuicSocket.h | 1 + quic/api/QuicTransportBase.cpp | 31 +++++++++++- quic/api/QuicTransportFunctions.cpp | 4 +- quic/api/test/Mocks.h | 19 ++++++- quic/api/test/QuicPacketSchedulerTest.cpp | 6 +-- quic/api/test/QuicTransportTest.cpp | 19 +++++-- quic/common/test/TestUtils.cpp | 6 ++- quic/common/test/TestUtils.h | 3 +- quic/state/OutstandingPacket.h | 22 +++++--- quic/state/StateData.h | 13 +++++ quic/tools/tperf/tperf.cpp | 4 +- 12 files changed, 168 insertions(+), 22 deletions(-) diff --git a/quic/api/Observer.h b/quic/api/Observer.h index 6b3702528..4eac4c252 100644 --- a/quic/api/Observer.h +++ b/quic/api/Observer.h @@ -37,6 +37,7 @@ class Observer { // following flags enable support for various callbacks. // observer and socket lifecycle callbacks are always enabled. bool evbEvents{false}; + bool appDataSentEvents{false}; bool appLimitedEvents{false}; bool lossEvents{false}; bool spuriousLossEvents{false}; @@ -50,6 +51,7 @@ class Observer { static Config getConfigAllEventsEnabled() { Config config = {}; config.evbEvents = true; + config.appDataSentEvents = true; config.appLimitedEvents = true; config.rttSamples = true; config.lossEvents = true; @@ -82,6 +84,24 @@ class Observer { return observerConfig_; } + struct AppLimitedEvent { + AppLimitedEvent( + const std::deque& outstandingPackets, + const uint64_t writeCount) + : outstandingPackets(outstandingPackets), writeCount(writeCount) {} + + // Cannot support copy ctors safely + AppLimitedEvent(const AppLimitedEvent&) = delete; + AppLimitedEvent(AppLimitedEvent&&) = delete; + + // Reference to the current list of outstanding packets + const std::deque& outstandingPackets; + // The current write number for the write() call that + // caused this appLimitedEvent. Used by Observers to identify specific + // packets from the outstandingPacket list (above). + const uint64_t writeCount; + }; + struct LostPacket { explicit LostPacket( bool lostbytimeout, @@ -273,12 +293,52 @@ class Observer { QuicSocket* /* socket */, folly::EventBase* /* evb */) noexcept {} + /** + * startWritingFromAppLimited() is invoked when the socket is currenty + * app rate limited and is being asked to write a new set of Bytes. + . This callback is invoked BEFORE we write the + * new bytes to the socket, this is done so that Observers can collect + * metadata about the connection BEFORE the start of a potential Write Block. + * + * @param socket Socket that has potentially sent application data. + reference to the number of outstanding packets + BEFORE the call to write() socket data. + + */ + virtual void startWritingFromAppLimited( + QuicSocket* /* socket */, + const AppLimitedEvent& /* appLimitedEvent */) {} + + /** + * packetsWritten() is invoked when the socket writes retransmittable packets + * to the wire, those packets will be present in the outstanding packets list. + * Observers can use this callback (which will always be invoked AFTER + * startWritingFromAppLimited) to *update* the transport's metadata within the + * currently tracked WriteBlock. + * + * If an Observer receives startWritingFromAppLimited but doesn't receive + * packetsWritten (and instead directly receives appRateLimited), it means the + * socket witnessed non-app data writes - this scenario is irrelevant and + * should not be used to construct Write Blocks. + * + * @param socket Socket that has sent application data. + * @param appLimitedEvent The AppLimitedEvent details which contains a const + reference to the number of outstanding packets + */ + virtual void packetsWritten( + QuicSocket* /* socket */, + const AppLimitedEvent& /* appLimitedEvent */) {} + /** * appRateLimited() is invoked when the socket is app rate limited. * * @param socket Socket that has become application rate limited. + * @param appLimitedEvent The AppLimitedEvent details which contains a const + reference to the number of outstanding packets */ - virtual void appRateLimited(QuicSocket* /* socket */) {} + virtual void appRateLimited( + QuicSocket* /* socket */, + const AppLimitedEvent& /* appLimitedEvent */) {} /** * packetLossDetected() is invoked when a packet loss is detected. diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index 9e17ef82b..cd4f944ad 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -145,6 +145,7 @@ class QuicSocket { uint64_t bytesSent{0}; uint64_t bytesAcked{0}; uint64_t bytesRecvd{0}; + uint64_t bytesInFlight{0}; uint64_t totalBytesRetransmitted{0}; uint32_t ptoCount{0}; uint32_t totalPTOCount{0}; diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 257bd30ca..77fd5a7e1 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -602,6 +602,7 @@ QuicSocket::TransportInfo QuicTransportBase::getTransportInfo() const { transportInfo.bytesSent = conn_->lossState.totalBytesSent; transportInfo.bytesAcked = conn_->lossState.totalBytesAcked; transportInfo.bytesRecvd = conn_->lossState.totalBytesRecvd; + transportInfo.bytesInFlight = conn_->lossState.inflightBytes; transportInfo.ptoCount = conn_->lossState.ptoCount; transportInfo.totalPTOCount = conn_->lossState.totalPTOCount; transportInfo.largestPacketAckedByPeer = @@ -2660,7 +2661,20 @@ QuicConnectionStats QuicTransportBase::getConnectionsStats() const { void QuicTransportBase::writeSocketData() { if (socket_) { + // record this invocation of a new write to the socket + ++(conn_->writeCount); auto packetsBefore = conn_->outstandings.numOutstanding(); + // Signal Observers the POTENTIAL start of a Write Block. + if (conn_->waitingForAppData && conn_->congestionController) { + Observer::AppLimitedEvent startWritingFromAppLimitedEvent( + conn_->outstandings.packets, conn_->writeCount); + for (const auto& cb : *observers_) { + if (cb->getConfig().appDataSentEvents) { + cb->startWritingFromAppLimited(this, startWritingFromAppLimitedEvent); + } + } + conn_->waitingForAppData = false; + } writeData(); if (closeState_ != CloseState::CLOSED) { if (conn_->pendingEvents.closeTransport == true) { @@ -2671,6 +2685,18 @@ void QuicTransportBase::writeSocketData() { setLossDetectionAlarm(*conn_, *this); auto packetsAfter = conn_->outstandings.numOutstanding(); bool packetWritten = (packetsAfter > packetsBefore); + // Signal the Observers that *some* packets were written + // These may/may not be app data packets, it it up to the Observers + // to deal with these packets. + if (packetWritten && conn_->congestionController) { + Observer::AppLimitedEvent packetsWrittenEvent( + conn_->outstandings.packets, conn_->writeCount); + for (const auto& cb : *observers_) { + if (cb->getConfig().appDataSentEvents) { + cb->packetsWritten(this, packetsWrittenEvent); + } + } + } if (conn_->loopDetectorCallback && packetWritten) { conn_->writeDebugState.currentEmptyLoopCount = 0; } else if ( @@ -2706,11 +2732,14 @@ void QuicTransportBase::writeSocketData() { conn_->congestionController->setAppLimited(); // notify via connection call and any observer callbacks connCallback_->onAppRateLimited(); + Observer::AppLimitedEvent appLimitedEvent( + conn_->outstandings.packets, conn_->writeCount); for (const auto& cb : *observers_) { if (cb->getConfig().appLimitedEvents) { - cb->appRateLimited(this); + cb->appRateLimited(this, appLimitedEvent); } } + conn_->waitingForAppData = true; } } } diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 5e60017be..32cce38ad 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -666,6 +666,7 @@ void updateConnection( return; } conn.lossState.totalAckElicitingPacketsSent++; + auto packetIt = std::find_if( conn.outstandings.packets.rbegin(), @@ -688,7 +689,8 @@ void updateConnection( conn.lossState.totalBytesSent, conn.lossState.inflightBytes + encodedSize, conn.outstandings.numOutstanding() + 1, - conn.lossState); + conn.lossState, + conn.writeCount); if (isD6DProbe) { ++conn.d6d.outstandingProbes; diff --git a/quic/api/test/Mocks.h b/quic/api/test/Mocks.h index 64d427293..3557a1a8b 100644 --- a/quic/api/test/Mocks.h +++ b/quic/api/test/Mocks.h @@ -323,7 +323,24 @@ class MockObserver : public Observer { void( QuicSocket*, const folly::Optional>&)); - GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*)); + GMOCK_METHOD2_( + , + noexcept, + , + startWritingFromAppLimited, + void(QuicSocket*, const AppLimitedEvent&)); + GMOCK_METHOD2_( + , + noexcept, + , + packetsWritten, + void(QuicSocket*, const AppLimitedEvent&)); + GMOCK_METHOD2_( + , + noexcept, + , + appRateLimited, + void(QuicSocket*, const AppLimitedEvent&)); GMOCK_METHOD2_( , noexcept, diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 4ff590b95..389f9462c 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -41,7 +41,7 @@ PacketNum addInitialOutstandingPacket(QuicConnectionStateBase& conn) { QuicVersion::QUIC_DRAFT); RegularQuicWritePacket packet(std::move(header)); conn.outstandings.packets.emplace_back( - packet, Clock::now(), 0, true, 0, 0, 0, LossState()); + packet, Clock::now(), 0, true, 0, 0, 0, LossState(), 0); conn.outstandings.handshakePacketsCount++; increaseNextPacketNum(conn, PacketNumberSpace::Handshake); return nextPacketNum; @@ -60,7 +60,7 @@ PacketNum addHandshakeOutstandingPacket(QuicConnectionStateBase& conn) { QuicVersion::QUIC_DRAFT); RegularQuicWritePacket packet(std::move(header)); conn.outstandings.packets.emplace_back( - packet, Clock::now(), 0, true, 0, 0, 0, LossState()); + packet, Clock::now(), 0, true, 0, 0, 0, LossState(), 0); conn.outstandings.handshakePacketsCount++; increaseNextPacketNum(conn, PacketNumberSpace::Handshake); return nextPacketNum; @@ -74,7 +74,7 @@ PacketNum addOutstandingPacket(QuicConnectionStateBase& conn) { nextPacketNum); RegularQuicWritePacket packet(std::move(header)); conn.outstandings.packets.emplace_back( - packet, Clock::now(), 0, false, 0, 0, 0, LossState()); + packet, Clock::now(), 0, false, 0, 0, 0, LossState(), 0); increaseNextPacketNum(conn, PacketNumberSpace::AppData); return nextPacketNum; } diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 737286ae9..b67775191 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -359,6 +359,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) { Observer::Config config = {}; config.appLimitedEvents = true; + config.appDataSentEvents = true; auto cb = std::make_unique>(config); EXPECT_CALL(*cb, observerAttach(transport_.get())); @@ -367,7 +368,9 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) { auto stream = transport_->createBidirectionalStream().value(); transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); - EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); + EXPECT_CALL(*cb, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb, appRateLimited(transport_.get(), _)).Times(0); loopForWrites(); Mock::VerifyAndClearExpectations(cb.get()); EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3); @@ -389,6 +392,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) { Observer::Config config = {}; config.appLimitedEvents = true; + config.appDataSentEvents = true; auto cb = std::make_unique>(config); EXPECT_CALL(*cb, observerAttach(transport_.get())); @@ -397,7 +401,9 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) { auto stream = transport_->createBidirectionalStream().value(); auto buf = buildRandomInputData(100 * 2000); transport_->writeChain(stream, buf->clone(), false, nullptr); - EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); + EXPECT_CALL(*cb, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb, appRateLimited(transport_.get(), _)).Times(0); loopForWrites(); Mock::VerifyAndClearExpectations(cb.get()); EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3); @@ -410,6 +416,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) { TEST_F(QuicTransportTest, AppLimitedWithObservers) { Observer::Config config = {}; config.appLimitedEvents = true; + config.appDataSentEvents = true; auto cb1 = std::make_unique>(config); auto cb2 = std::make_unique>(config); EXPECT_CALL(*cb1, observerAttach(transport_.get())); @@ -430,8 +437,12 @@ TEST_F(QuicTransportTest, AppLimitedWithObservers) { transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1); - EXPECT_CALL(*cb1, appRateLimited(transport_.get())); - EXPECT_CALL(*cb2, appRateLimited(transport_.get())); + EXPECT_CALL(*cb1, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb1, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb1, appRateLimited(transport_.get(), _)); + EXPECT_CALL(*cb2, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb2, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb2, appRateLimited(transport_.get(), _)); loopForWrites(); Mock::VerifyAndClearExpectations(cb1.get()); Mock::VerifyAndClearExpectations(cb2.get()); diff --git a/quic/common/test/TestUtils.cpp b/quic/common/test/TestUtils.cpp index e3988ca12..f0768cf57 100644 --- a/quic/common/test/TestUtils.cpp +++ b/quic/common/test/TestUtils.cpp @@ -564,7 +564,8 @@ OutstandingPacket makeTestingWritePacket( size_t desiredSize, uint64_t totalBytesSent, TimePoint sentTime /* = Clock::now() */, - uint64_t inflightBytes /* = 0 */) { + uint64_t inflightBytes /* = 0 */, + uint64_t writeCount /* = 0 */) { LongHeader longHeader( LongHeader::Types::ZeroRtt, getTestConnectionId(1), @@ -580,7 +581,8 @@ OutstandingPacket makeTestingWritePacket( totalBytesSent, inflightBytes, 0, - LossState()); + LossState(), + writeCount); } CongestionController::AckEvent makeAck( diff --git a/quic/common/test/TestUtils.h b/quic/common/test/TestUtils.h index 4f55a65f7..590c14645 100644 --- a/quic/common/test/TestUtils.h +++ b/quic/common/test/TestUtils.h @@ -222,7 +222,8 @@ OutstandingPacket makeTestingWritePacket( size_t desiredSize, uint64_t totalBytesSent, TimePoint sentTime = Clock::now(), - uint64_t inflightBytes = 0); + uint64_t inflightBytes = 0, + uint64_t writeCount = 0); // TODO: The way we setup packet sent, ack, loss in test cases can use some // major refactor. diff --git a/quic/state/OutstandingPacket.h b/quic/state/OutstandingPacket.h index 7c6497042..956ff58cf 100644 --- a/quic/state/OutstandingPacket.h +++ b/quic/state/OutstandingPacket.h @@ -35,6 +35,9 @@ struct OutstandingPacketMetadata { uint32_t totalPacketsSent{0}; // Total number of ack-eliciting packets sent on this connection. uint32_t totalAckElicitingPacketsSent{0}; + // Write Count is the value of the monotonically increasing counter which + // tracks the number of writes on this socket. + uint64_t writeCount{0}; OutstandingPacketMetadata( TimePoint timeIn, @@ -44,7 +47,8 @@ struct OutstandingPacketMetadata { uint64_t totalBytesSentIn, uint64_t inflightBytesIn, uint64_t packetsInflightIn, - const LossState& lossStateIn) + const LossState& lossStateIn, + uint64_t writeCount) : time(timeIn), encodedSize(encodedSizeIn), isHandshake(isHandshakeIn), @@ -53,8 +57,8 @@ struct OutstandingPacketMetadata { inflightBytes(inflightBytesIn), packetsInflight(packetsInflightIn), totalPacketsSent(lossStateIn.totalPacketsSent), - totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent) { - } + totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent), + writeCount(writeCount) {} }; // Data structure to represent outstanding retransmittable packets @@ -119,7 +123,8 @@ struct OutstandingPacket { uint64_t totalBytesSentIn, uint64_t inflightBytesIn, uint64_t packetsInflightIn, - const LossState& lossStateIn) + const LossState& lossStateIn, + uint64_t writeCount = 0) : packet(std::move(packetIn)), metadata(OutstandingPacketMetadata( timeIn, @@ -129,7 +134,8 @@ struct OutstandingPacket { totalBytesSentIn, inflightBytesIn, packetsInflightIn, - lossStateIn)) {} + lossStateIn, + writeCount)) {} OutstandingPacket( RegularQuicWritePacket packetIn, @@ -140,7 +146,8 @@ struct OutstandingPacket { uint64_t totalBytesSentIn, uint64_t inflightBytesIn, uint64_t packetsInflightIn, - const LossState& lossStateIn) + const LossState& lossStateIn, + uint64_t writeCount = 0) : packet(std::move(packetIn)), metadata(OutstandingPacketMetadata( timeIn, @@ -150,6 +157,7 @@ struct OutstandingPacket { totalBytesSentIn, inflightBytesIn, packetsInflightIn, - lossStateIn)) {} + lossStateIn, + writeCount)) {} }; } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 44b3841ce..9e28289d4 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -821,6 +821,19 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction { // Whether a connection can be paced based on its handshake and close states. // For example, we may not want to pace a connection that's still handshaking. bool canBePaced{false}; + + // Flag indicating whether the socket is currently waiting for the app to + // write data. All new sockets start off in this state where they wait for the + // application to pump data to the socket. + // TODO: Merge this flag with the existing appLimited flag that exists on each + // Congestion Controller. + bool waitingForAppData{true}; + + // Monotonically increasing counter that is incremented each time there is a + // write on this socket (writeSocketData() is called), This is used to + // identify specific outstanding packets (based on writeCount and packetNum) + // in the Observers, to construct Write Blocks + uint64_t writeCount{0}; }; std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); diff --git a/quic/tools/tperf/tperf.cpp b/quic/tools/tperf/tperf.cpp index 6aedf9ebc..00af5d348 100644 --- a/quic/tools/tperf/tperf.cpp +++ b/quic/tools/tperf/tperf.cpp @@ -134,7 +134,9 @@ ProbeSizeRaiserType parseRaiserType(uint32_t type) { class TPerfObserver : public Observer { public: TPerfObserver(const Observer::Config& config) : Observer(config) {} - void appRateLimited(QuicSocket* /* socket */) override { + void appRateLimited( + QuicSocket* /* socket */, + const quic::Observer::AppLimitedEvent& /* appLimitedEvent */) override { if (FLAGS_log_app_rate_limited) { LOG(INFO) << "appRateLimited detected"; }