diff --git a/quic/api/Observer.h b/quic/api/Observer.h index 6b3702528..4eac4c252 100644 --- a/quic/api/Observer.h +++ b/quic/api/Observer.h @@ -37,6 +37,7 @@ class Observer { // following flags enable support for various callbacks. // observer and socket lifecycle callbacks are always enabled. bool evbEvents{false}; + bool appDataSentEvents{false}; bool appLimitedEvents{false}; bool lossEvents{false}; bool spuriousLossEvents{false}; @@ -50,6 +51,7 @@ class Observer { static Config getConfigAllEventsEnabled() { Config config = {}; config.evbEvents = true; + config.appDataSentEvents = true; config.appLimitedEvents = true; config.rttSamples = true; config.lossEvents = true; @@ -82,6 +84,24 @@ class Observer { return observerConfig_; } + struct AppLimitedEvent { + AppLimitedEvent( + const std::deque& outstandingPackets, + const uint64_t writeCount) + : outstandingPackets(outstandingPackets), writeCount(writeCount) {} + + // Cannot support copy ctors safely + AppLimitedEvent(const AppLimitedEvent&) = delete; + AppLimitedEvent(AppLimitedEvent&&) = delete; + + // Reference to the current list of outstanding packets + const std::deque& outstandingPackets; + // The current write number for the write() call that + // caused this appLimitedEvent. Used by Observers to identify specific + // packets from the outstandingPacket list (above). + const uint64_t writeCount; + }; + struct LostPacket { explicit LostPacket( bool lostbytimeout, @@ -273,12 +293,52 @@ class Observer { QuicSocket* /* socket */, folly::EventBase* /* evb */) noexcept {} + /** + * startWritingFromAppLimited() is invoked when the socket is currenty + * app rate limited and is being asked to write a new set of Bytes. + . This callback is invoked BEFORE we write the + * new bytes to the socket, this is done so that Observers can collect + * metadata about the connection BEFORE the start of a potential Write Block. + * + * @param socket Socket that has potentially sent application data. + reference to the number of outstanding packets + BEFORE the call to write() socket data. + + */ + virtual void startWritingFromAppLimited( + QuicSocket* /* socket */, + const AppLimitedEvent& /* appLimitedEvent */) {} + + /** + * packetsWritten() is invoked when the socket writes retransmittable packets + * to the wire, those packets will be present in the outstanding packets list. + * Observers can use this callback (which will always be invoked AFTER + * startWritingFromAppLimited) to *update* the transport's metadata within the + * currently tracked WriteBlock. + * + * If an Observer receives startWritingFromAppLimited but doesn't receive + * packetsWritten (and instead directly receives appRateLimited), it means the + * socket witnessed non-app data writes - this scenario is irrelevant and + * should not be used to construct Write Blocks. + * + * @param socket Socket that has sent application data. + * @param appLimitedEvent The AppLimitedEvent details which contains a const + reference to the number of outstanding packets + */ + virtual void packetsWritten( + QuicSocket* /* socket */, + const AppLimitedEvent& /* appLimitedEvent */) {} + /** * appRateLimited() is invoked when the socket is app rate limited. * * @param socket Socket that has become application rate limited. + * @param appLimitedEvent The AppLimitedEvent details which contains a const + reference to the number of outstanding packets */ - virtual void appRateLimited(QuicSocket* /* socket */) {} + virtual void appRateLimited( + QuicSocket* /* socket */, + const AppLimitedEvent& /* appLimitedEvent */) {} /** * packetLossDetected() is invoked when a packet loss is detected. diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index 9e17ef82b..cd4f944ad 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -145,6 +145,7 @@ class QuicSocket { uint64_t bytesSent{0}; uint64_t bytesAcked{0}; uint64_t bytesRecvd{0}; + uint64_t bytesInFlight{0}; uint64_t totalBytesRetransmitted{0}; uint32_t ptoCount{0}; uint32_t totalPTOCount{0}; diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 257bd30ca..77fd5a7e1 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -602,6 +602,7 @@ QuicSocket::TransportInfo QuicTransportBase::getTransportInfo() const { transportInfo.bytesSent = conn_->lossState.totalBytesSent; transportInfo.bytesAcked = conn_->lossState.totalBytesAcked; transportInfo.bytesRecvd = conn_->lossState.totalBytesRecvd; + transportInfo.bytesInFlight = conn_->lossState.inflightBytes; transportInfo.ptoCount = conn_->lossState.ptoCount; transportInfo.totalPTOCount = conn_->lossState.totalPTOCount; transportInfo.largestPacketAckedByPeer = @@ -2660,7 +2661,20 @@ QuicConnectionStats QuicTransportBase::getConnectionsStats() const { void QuicTransportBase::writeSocketData() { if (socket_) { + // record this invocation of a new write to the socket + ++(conn_->writeCount); auto packetsBefore = conn_->outstandings.numOutstanding(); + // Signal Observers the POTENTIAL start of a Write Block. + if (conn_->waitingForAppData && conn_->congestionController) { + Observer::AppLimitedEvent startWritingFromAppLimitedEvent( + conn_->outstandings.packets, conn_->writeCount); + for (const auto& cb : *observers_) { + if (cb->getConfig().appDataSentEvents) { + cb->startWritingFromAppLimited(this, startWritingFromAppLimitedEvent); + } + } + conn_->waitingForAppData = false; + } writeData(); if (closeState_ != CloseState::CLOSED) { if (conn_->pendingEvents.closeTransport == true) { @@ -2671,6 +2685,18 @@ void QuicTransportBase::writeSocketData() { setLossDetectionAlarm(*conn_, *this); auto packetsAfter = conn_->outstandings.numOutstanding(); bool packetWritten = (packetsAfter > packetsBefore); + // Signal the Observers that *some* packets were written + // These may/may not be app data packets, it it up to the Observers + // to deal with these packets. + if (packetWritten && conn_->congestionController) { + Observer::AppLimitedEvent packetsWrittenEvent( + conn_->outstandings.packets, conn_->writeCount); + for (const auto& cb : *observers_) { + if (cb->getConfig().appDataSentEvents) { + cb->packetsWritten(this, packetsWrittenEvent); + } + } + } if (conn_->loopDetectorCallback && packetWritten) { conn_->writeDebugState.currentEmptyLoopCount = 0; } else if ( @@ -2706,11 +2732,14 @@ void QuicTransportBase::writeSocketData() { conn_->congestionController->setAppLimited(); // notify via connection call and any observer callbacks connCallback_->onAppRateLimited(); + Observer::AppLimitedEvent appLimitedEvent( + conn_->outstandings.packets, conn_->writeCount); for (const auto& cb : *observers_) { if (cb->getConfig().appLimitedEvents) { - cb->appRateLimited(this); + cb->appRateLimited(this, appLimitedEvent); } } + conn_->waitingForAppData = true; } } } diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 5e60017be..32cce38ad 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -666,6 +666,7 @@ void updateConnection( return; } conn.lossState.totalAckElicitingPacketsSent++; + auto packetIt = std::find_if( conn.outstandings.packets.rbegin(), @@ -688,7 +689,8 @@ void updateConnection( conn.lossState.totalBytesSent, conn.lossState.inflightBytes + encodedSize, conn.outstandings.numOutstanding() + 1, - conn.lossState); + conn.lossState, + conn.writeCount); if (isD6DProbe) { ++conn.d6d.outstandingProbes; diff --git a/quic/api/test/Mocks.h b/quic/api/test/Mocks.h index 64d427293..3557a1a8b 100644 --- a/quic/api/test/Mocks.h +++ b/quic/api/test/Mocks.h @@ -323,7 +323,24 @@ class MockObserver : public Observer { void( QuicSocket*, const folly::Optional>&)); - GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*)); + GMOCK_METHOD2_( + , + noexcept, + , + startWritingFromAppLimited, + void(QuicSocket*, const AppLimitedEvent&)); + GMOCK_METHOD2_( + , + noexcept, + , + packetsWritten, + void(QuicSocket*, const AppLimitedEvent&)); + GMOCK_METHOD2_( + , + noexcept, + , + appRateLimited, + void(QuicSocket*, const AppLimitedEvent&)); GMOCK_METHOD2_( , noexcept, diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 4ff590b95..389f9462c 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -41,7 +41,7 @@ PacketNum addInitialOutstandingPacket(QuicConnectionStateBase& conn) { QuicVersion::QUIC_DRAFT); RegularQuicWritePacket packet(std::move(header)); conn.outstandings.packets.emplace_back( - packet, Clock::now(), 0, true, 0, 0, 0, LossState()); + packet, Clock::now(), 0, true, 0, 0, 0, LossState(), 0); conn.outstandings.handshakePacketsCount++; increaseNextPacketNum(conn, PacketNumberSpace::Handshake); return nextPacketNum; @@ -60,7 +60,7 @@ PacketNum addHandshakeOutstandingPacket(QuicConnectionStateBase& conn) { QuicVersion::QUIC_DRAFT); RegularQuicWritePacket packet(std::move(header)); conn.outstandings.packets.emplace_back( - packet, Clock::now(), 0, true, 0, 0, 0, LossState()); + packet, Clock::now(), 0, true, 0, 0, 0, LossState(), 0); conn.outstandings.handshakePacketsCount++; increaseNextPacketNum(conn, PacketNumberSpace::Handshake); return nextPacketNum; @@ -74,7 +74,7 @@ PacketNum addOutstandingPacket(QuicConnectionStateBase& conn) { nextPacketNum); RegularQuicWritePacket packet(std::move(header)); conn.outstandings.packets.emplace_back( - packet, Clock::now(), 0, false, 0, 0, 0, LossState()); + packet, Clock::now(), 0, false, 0, 0, 0, LossState(), 0); increaseNextPacketNum(conn, PacketNumberSpace::AppData); return nextPacketNum; } diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 737286ae9..b67775191 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -359,6 +359,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) { Observer::Config config = {}; config.appLimitedEvents = true; + config.appDataSentEvents = true; auto cb = std::make_unique>(config); EXPECT_CALL(*cb, observerAttach(transport_.get())); @@ -367,7 +368,9 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) { auto stream = transport_->createBidirectionalStream().value(); transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); - EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); + EXPECT_CALL(*cb, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb, appRateLimited(transport_.get(), _)).Times(0); loopForWrites(); Mock::VerifyAndClearExpectations(cb.get()); EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3); @@ -389,6 +392,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) { Observer::Config config = {}; config.appLimitedEvents = true; + config.appDataSentEvents = true; auto cb = std::make_unique>(config); EXPECT_CALL(*cb, observerAttach(transport_.get())); @@ -397,7 +401,9 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) { auto stream = transport_->createBidirectionalStream().value(); auto buf = buildRandomInputData(100 * 2000); transport_->writeChain(stream, buf->clone(), false, nullptr); - EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); + EXPECT_CALL(*cb, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb, appRateLimited(transport_.get(), _)).Times(0); loopForWrites(); Mock::VerifyAndClearExpectations(cb.get()); EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3); @@ -410,6 +416,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) { TEST_F(QuicTransportTest, AppLimitedWithObservers) { Observer::Config config = {}; config.appLimitedEvents = true; + config.appDataSentEvents = true; auto cb1 = std::make_unique>(config); auto cb2 = std::make_unique>(config); EXPECT_CALL(*cb1, observerAttach(transport_.get())); @@ -430,8 +437,12 @@ TEST_F(QuicTransportTest, AppLimitedWithObservers) { transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1); - EXPECT_CALL(*cb1, appRateLimited(transport_.get())); - EXPECT_CALL(*cb2, appRateLimited(transport_.get())); + EXPECT_CALL(*cb1, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb1, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb1, appRateLimited(transport_.get(), _)); + EXPECT_CALL(*cb2, startWritingFromAppLimited(transport_.get(), _)); + EXPECT_CALL(*cb2, packetsWritten(transport_.get(), _)); + EXPECT_CALL(*cb2, appRateLimited(transport_.get(), _)); loopForWrites(); Mock::VerifyAndClearExpectations(cb1.get()); Mock::VerifyAndClearExpectations(cb2.get()); diff --git a/quic/common/test/TestUtils.cpp b/quic/common/test/TestUtils.cpp index e3988ca12..f0768cf57 100644 --- a/quic/common/test/TestUtils.cpp +++ b/quic/common/test/TestUtils.cpp @@ -564,7 +564,8 @@ OutstandingPacket makeTestingWritePacket( size_t desiredSize, uint64_t totalBytesSent, TimePoint sentTime /* = Clock::now() */, - uint64_t inflightBytes /* = 0 */) { + uint64_t inflightBytes /* = 0 */, + uint64_t writeCount /* = 0 */) { LongHeader longHeader( LongHeader::Types::ZeroRtt, getTestConnectionId(1), @@ -580,7 +581,8 @@ OutstandingPacket makeTestingWritePacket( totalBytesSent, inflightBytes, 0, - LossState()); + LossState(), + writeCount); } CongestionController::AckEvent makeAck( diff --git a/quic/common/test/TestUtils.h b/quic/common/test/TestUtils.h index 4f55a65f7..590c14645 100644 --- a/quic/common/test/TestUtils.h +++ b/quic/common/test/TestUtils.h @@ -222,7 +222,8 @@ OutstandingPacket makeTestingWritePacket( size_t desiredSize, uint64_t totalBytesSent, TimePoint sentTime = Clock::now(), - uint64_t inflightBytes = 0); + uint64_t inflightBytes = 0, + uint64_t writeCount = 0); // TODO: The way we setup packet sent, ack, loss in test cases can use some // major refactor. diff --git a/quic/state/OutstandingPacket.h b/quic/state/OutstandingPacket.h index 7c6497042..956ff58cf 100644 --- a/quic/state/OutstandingPacket.h +++ b/quic/state/OutstandingPacket.h @@ -35,6 +35,9 @@ struct OutstandingPacketMetadata { uint32_t totalPacketsSent{0}; // Total number of ack-eliciting packets sent on this connection. uint32_t totalAckElicitingPacketsSent{0}; + // Write Count is the value of the monotonically increasing counter which + // tracks the number of writes on this socket. + uint64_t writeCount{0}; OutstandingPacketMetadata( TimePoint timeIn, @@ -44,7 +47,8 @@ struct OutstandingPacketMetadata { uint64_t totalBytesSentIn, uint64_t inflightBytesIn, uint64_t packetsInflightIn, - const LossState& lossStateIn) + const LossState& lossStateIn, + uint64_t writeCount) : time(timeIn), encodedSize(encodedSizeIn), isHandshake(isHandshakeIn), @@ -53,8 +57,8 @@ struct OutstandingPacketMetadata { inflightBytes(inflightBytesIn), packetsInflight(packetsInflightIn), totalPacketsSent(lossStateIn.totalPacketsSent), - totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent) { - } + totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent), + writeCount(writeCount) {} }; // Data structure to represent outstanding retransmittable packets @@ -119,7 +123,8 @@ struct OutstandingPacket { uint64_t totalBytesSentIn, uint64_t inflightBytesIn, uint64_t packetsInflightIn, - const LossState& lossStateIn) + const LossState& lossStateIn, + uint64_t writeCount = 0) : packet(std::move(packetIn)), metadata(OutstandingPacketMetadata( timeIn, @@ -129,7 +134,8 @@ struct OutstandingPacket { totalBytesSentIn, inflightBytesIn, packetsInflightIn, - lossStateIn)) {} + lossStateIn, + writeCount)) {} OutstandingPacket( RegularQuicWritePacket packetIn, @@ -140,7 +146,8 @@ struct OutstandingPacket { uint64_t totalBytesSentIn, uint64_t inflightBytesIn, uint64_t packetsInflightIn, - const LossState& lossStateIn) + const LossState& lossStateIn, + uint64_t writeCount = 0) : packet(std::move(packetIn)), metadata(OutstandingPacketMetadata( timeIn, @@ -150,6 +157,7 @@ struct OutstandingPacket { totalBytesSentIn, inflightBytesIn, packetsInflightIn, - lossStateIn)) {} + lossStateIn, + writeCount)) {} }; } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 44b3841ce..9e28289d4 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -821,6 +821,19 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction { // Whether a connection can be paced based on its handshake and close states. // For example, we may not want to pace a connection that's still handshaking. bool canBePaced{false}; + + // Flag indicating whether the socket is currently waiting for the app to + // write data. All new sockets start off in this state where they wait for the + // application to pump data to the socket. + // TODO: Merge this flag with the existing appLimited flag that exists on each + // Congestion Controller. + bool waitingForAppData{true}; + + // Monotonically increasing counter that is incremented each time there is a + // write on this socket (writeSocketData() is called), This is used to + // identify specific outstanding packets (based on writeCount and packetNum) + // in the Observers, to construct Write Blocks + uint64_t writeCount{0}; }; std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); diff --git a/quic/tools/tperf/tperf.cpp b/quic/tools/tperf/tperf.cpp index 6aedf9ebc..00af5d348 100644 --- a/quic/tools/tperf/tperf.cpp +++ b/quic/tools/tperf/tperf.cpp @@ -134,7 +134,9 @@ ProbeSizeRaiserType parseRaiserType(uint32_t type) { class TPerfObserver : public Observer { public: TPerfObserver(const Observer::Config& config) : Observer(config) {} - void appRateLimited(QuicSocket* /* socket */) override { + void appRateLimited( + QuicSocket* /* socket */, + const quic::Observer::AppLimitedEvent& /* appLimitedEvent */) override { if (FLAGS_log_app_rate_limited) { LOG(INFO) << "appRateLimited detected"; }