mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-06 22:22:38 +03:00
Add support for detecting start and stop of app rate limited scenarios
Summary: Previously, we only had support for notifying observers when a QUIC socket became application rate limited. This change adds a similar notification when a socket does not become application rate limited, i.e when the application *starts* writing data to the socket - either for the first time on a newly established socket or after a previous block of writes were written and the app had no more to write. In addition, we include the number of outstanding packets (only those that are carrying app data in them) so that observers can use this data to timestamp the start and end of periods where the socket performs app data writes. Reviewed By: yangchi Differential Revision: D26559598 fbshipit-source-id: 0a8df7082b83e2ffad9b5addceca29cc03897243
This commit is contained in:
committed by
Facebook GitHub Bot
parent
3c350f5256
commit
d2f005dc00
@@ -37,6 +37,7 @@ class Observer {
|
||||
// following flags enable support for various callbacks.
|
||||
// observer and socket lifecycle callbacks are always enabled.
|
||||
bool evbEvents{false};
|
||||
bool appDataSentEvents{false};
|
||||
bool appLimitedEvents{false};
|
||||
bool lossEvents{false};
|
||||
bool spuriousLossEvents{false};
|
||||
@@ -50,6 +51,7 @@ class Observer {
|
||||
static Config getConfigAllEventsEnabled() {
|
||||
Config config = {};
|
||||
config.evbEvents = true;
|
||||
config.appDataSentEvents = true;
|
||||
config.appLimitedEvents = true;
|
||||
config.rttSamples = true;
|
||||
config.lossEvents = true;
|
||||
@@ -82,6 +84,24 @@ class Observer {
|
||||
return observerConfig_;
|
||||
}
|
||||
|
||||
struct AppLimitedEvent {
|
||||
AppLimitedEvent(
|
||||
const std::deque<OutstandingPacket>& outstandingPackets,
|
||||
const uint64_t writeCount)
|
||||
: outstandingPackets(outstandingPackets), writeCount(writeCount) {}
|
||||
|
||||
// Cannot support copy ctors safely
|
||||
AppLimitedEvent(const AppLimitedEvent&) = delete;
|
||||
AppLimitedEvent(AppLimitedEvent&&) = delete;
|
||||
|
||||
// Reference to the current list of outstanding packets
|
||||
const std::deque<OutstandingPacket>& outstandingPackets;
|
||||
// The current write number for the write() call that
|
||||
// caused this appLimitedEvent. Used by Observers to identify specific
|
||||
// packets from the outstandingPacket list (above).
|
||||
const uint64_t writeCount;
|
||||
};
|
||||
|
||||
struct LostPacket {
|
||||
explicit LostPacket(
|
||||
bool lostbytimeout,
|
||||
@@ -273,12 +293,52 @@ class Observer {
|
||||
QuicSocket* /* socket */,
|
||||
folly::EventBase* /* evb */) noexcept {}
|
||||
|
||||
/**
|
||||
* startWritingFromAppLimited() is invoked when the socket is currenty
|
||||
* app rate limited and is being asked to write a new set of Bytes.
|
||||
. This callback is invoked BEFORE we write the
|
||||
* new bytes to the socket, this is done so that Observers can collect
|
||||
* metadata about the connection BEFORE the start of a potential Write Block.
|
||||
*
|
||||
* @param socket Socket that has potentially sent application data.
|
||||
reference to the number of outstanding packets
|
||||
BEFORE the call to write() socket data.
|
||||
|
||||
*/
|
||||
virtual void startWritingFromAppLimited(
|
||||
QuicSocket* /* socket */,
|
||||
const AppLimitedEvent& /* appLimitedEvent */) {}
|
||||
|
||||
/**
|
||||
* packetsWritten() is invoked when the socket writes retransmittable packets
|
||||
* to the wire, those packets will be present in the outstanding packets list.
|
||||
* Observers can use this callback (which will always be invoked AFTER
|
||||
* startWritingFromAppLimited) to *update* the transport's metadata within the
|
||||
* currently tracked WriteBlock.
|
||||
*
|
||||
* If an Observer receives startWritingFromAppLimited but doesn't receive
|
||||
* packetsWritten (and instead directly receives appRateLimited), it means the
|
||||
* socket witnessed non-app data writes - this scenario is irrelevant and
|
||||
* should not be used to construct Write Blocks.
|
||||
*
|
||||
* @param socket Socket that has sent application data.
|
||||
* @param appLimitedEvent The AppLimitedEvent details which contains a const
|
||||
reference to the number of outstanding packets
|
||||
*/
|
||||
virtual void packetsWritten(
|
||||
QuicSocket* /* socket */,
|
||||
const AppLimitedEvent& /* appLimitedEvent */) {}
|
||||
|
||||
/**
|
||||
* appRateLimited() is invoked when the socket is app rate limited.
|
||||
*
|
||||
* @param socket Socket that has become application rate limited.
|
||||
* @param appLimitedEvent The AppLimitedEvent details which contains a const
|
||||
reference to the number of outstanding packets
|
||||
*/
|
||||
virtual void appRateLimited(QuicSocket* /* socket */) {}
|
||||
virtual void appRateLimited(
|
||||
QuicSocket* /* socket */,
|
||||
const AppLimitedEvent& /* appLimitedEvent */) {}
|
||||
|
||||
/**
|
||||
* packetLossDetected() is invoked when a packet loss is detected.
|
||||
|
@@ -145,6 +145,7 @@ class QuicSocket {
|
||||
uint64_t bytesSent{0};
|
||||
uint64_t bytesAcked{0};
|
||||
uint64_t bytesRecvd{0};
|
||||
uint64_t bytesInFlight{0};
|
||||
uint64_t totalBytesRetransmitted{0};
|
||||
uint32_t ptoCount{0};
|
||||
uint32_t totalPTOCount{0};
|
||||
|
@@ -602,6 +602,7 @@ QuicSocket::TransportInfo QuicTransportBase::getTransportInfo() const {
|
||||
transportInfo.bytesSent = conn_->lossState.totalBytesSent;
|
||||
transportInfo.bytesAcked = conn_->lossState.totalBytesAcked;
|
||||
transportInfo.bytesRecvd = conn_->lossState.totalBytesRecvd;
|
||||
transportInfo.bytesInFlight = conn_->lossState.inflightBytes;
|
||||
transportInfo.ptoCount = conn_->lossState.ptoCount;
|
||||
transportInfo.totalPTOCount = conn_->lossState.totalPTOCount;
|
||||
transportInfo.largestPacketAckedByPeer =
|
||||
@@ -2660,7 +2661,20 @@ QuicConnectionStats QuicTransportBase::getConnectionsStats() const {
|
||||
|
||||
void QuicTransportBase::writeSocketData() {
|
||||
if (socket_) {
|
||||
// record this invocation of a new write to the socket
|
||||
++(conn_->writeCount);
|
||||
auto packetsBefore = conn_->outstandings.numOutstanding();
|
||||
// Signal Observers the POTENTIAL start of a Write Block.
|
||||
if (conn_->waitingForAppData && conn_->congestionController) {
|
||||
Observer::AppLimitedEvent startWritingFromAppLimitedEvent(
|
||||
conn_->outstandings.packets, conn_->writeCount);
|
||||
for (const auto& cb : *observers_) {
|
||||
if (cb->getConfig().appDataSentEvents) {
|
||||
cb->startWritingFromAppLimited(this, startWritingFromAppLimitedEvent);
|
||||
}
|
||||
}
|
||||
conn_->waitingForAppData = false;
|
||||
}
|
||||
writeData();
|
||||
if (closeState_ != CloseState::CLOSED) {
|
||||
if (conn_->pendingEvents.closeTransport == true) {
|
||||
@@ -2671,6 +2685,18 @@ void QuicTransportBase::writeSocketData() {
|
||||
setLossDetectionAlarm(*conn_, *this);
|
||||
auto packetsAfter = conn_->outstandings.numOutstanding();
|
||||
bool packetWritten = (packetsAfter > packetsBefore);
|
||||
// Signal the Observers that *some* packets were written
|
||||
// These may/may not be app data packets, it it up to the Observers
|
||||
// to deal with these packets.
|
||||
if (packetWritten && conn_->congestionController) {
|
||||
Observer::AppLimitedEvent packetsWrittenEvent(
|
||||
conn_->outstandings.packets, conn_->writeCount);
|
||||
for (const auto& cb : *observers_) {
|
||||
if (cb->getConfig().appDataSentEvents) {
|
||||
cb->packetsWritten(this, packetsWrittenEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (conn_->loopDetectorCallback && packetWritten) {
|
||||
conn_->writeDebugState.currentEmptyLoopCount = 0;
|
||||
} else if (
|
||||
@@ -2706,11 +2732,14 @@ void QuicTransportBase::writeSocketData() {
|
||||
conn_->congestionController->setAppLimited();
|
||||
// notify via connection call and any observer callbacks
|
||||
connCallback_->onAppRateLimited();
|
||||
Observer::AppLimitedEvent appLimitedEvent(
|
||||
conn_->outstandings.packets, conn_->writeCount);
|
||||
for (const auto& cb : *observers_) {
|
||||
if (cb->getConfig().appLimitedEvents) {
|
||||
cb->appRateLimited(this);
|
||||
cb->appRateLimited(this, appLimitedEvent);
|
||||
}
|
||||
}
|
||||
conn_->waitingForAppData = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -666,6 +666,7 @@ void updateConnection(
|
||||
return;
|
||||
}
|
||||
conn.lossState.totalAckElicitingPacketsSent++;
|
||||
|
||||
auto packetIt =
|
||||
std::find_if(
|
||||
conn.outstandings.packets.rbegin(),
|
||||
@@ -688,7 +689,8 @@ void updateConnection(
|
||||
conn.lossState.totalBytesSent,
|
||||
conn.lossState.inflightBytes + encodedSize,
|
||||
conn.outstandings.numOutstanding() + 1,
|
||||
conn.lossState);
|
||||
conn.lossState,
|
||||
conn.writeCount);
|
||||
|
||||
if (isD6DProbe) {
|
||||
++conn.d6d.outstandingProbes;
|
||||
|
@@ -323,7 +323,24 @@ class MockObserver : public Observer {
|
||||
void(
|
||||
QuicSocket*,
|
||||
const folly::Optional<std::pair<QuicErrorCode, std::string>>&));
|
||||
GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*));
|
||||
GMOCK_METHOD2_(
|
||||
,
|
||||
noexcept,
|
||||
,
|
||||
startWritingFromAppLimited,
|
||||
void(QuicSocket*, const AppLimitedEvent&));
|
||||
GMOCK_METHOD2_(
|
||||
,
|
||||
noexcept,
|
||||
,
|
||||
packetsWritten,
|
||||
void(QuicSocket*, const AppLimitedEvent&));
|
||||
GMOCK_METHOD2_(
|
||||
,
|
||||
noexcept,
|
||||
,
|
||||
appRateLimited,
|
||||
void(QuicSocket*, const AppLimitedEvent&));
|
||||
GMOCK_METHOD2_(
|
||||
,
|
||||
noexcept,
|
||||
|
@@ -41,7 +41,7 @@ PacketNum addInitialOutstandingPacket(QuicConnectionStateBase& conn) {
|
||||
QuicVersion::QUIC_DRAFT);
|
||||
RegularQuicWritePacket packet(std::move(header));
|
||||
conn.outstandings.packets.emplace_back(
|
||||
packet, Clock::now(), 0, true, 0, 0, 0, LossState());
|
||||
packet, Clock::now(), 0, true, 0, 0, 0, LossState(), 0);
|
||||
conn.outstandings.handshakePacketsCount++;
|
||||
increaseNextPacketNum(conn, PacketNumberSpace::Handshake);
|
||||
return nextPacketNum;
|
||||
@@ -60,7 +60,7 @@ PacketNum addHandshakeOutstandingPacket(QuicConnectionStateBase& conn) {
|
||||
QuicVersion::QUIC_DRAFT);
|
||||
RegularQuicWritePacket packet(std::move(header));
|
||||
conn.outstandings.packets.emplace_back(
|
||||
packet, Clock::now(), 0, true, 0, 0, 0, LossState());
|
||||
packet, Clock::now(), 0, true, 0, 0, 0, LossState(), 0);
|
||||
conn.outstandings.handshakePacketsCount++;
|
||||
increaseNextPacketNum(conn, PacketNumberSpace::Handshake);
|
||||
return nextPacketNum;
|
||||
@@ -74,7 +74,7 @@ PacketNum addOutstandingPacket(QuicConnectionStateBase& conn) {
|
||||
nextPacketNum);
|
||||
RegularQuicWritePacket packet(std::move(header));
|
||||
conn.outstandings.packets.emplace_back(
|
||||
packet, Clock::now(), 0, false, 0, 0, 0, LossState());
|
||||
packet, Clock::now(), 0, false, 0, 0, 0, LossState(), 0);
|
||||
increaseNextPacketNum(conn, PacketNumberSpace::AppData);
|
||||
return nextPacketNum;
|
||||
}
|
||||
|
@@ -359,6 +359,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) {
|
||||
|
||||
Observer::Config config = {};
|
||||
config.appLimitedEvents = true;
|
||||
config.appDataSentEvents = true;
|
||||
auto cb = std::make_unique<StrictMock<MockObserver>>(config);
|
||||
|
||||
EXPECT_CALL(*cb, observerAttach(transport_.get()));
|
||||
@@ -367,7 +368,9 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) {
|
||||
auto stream = transport_->createBidirectionalStream().value();
|
||||
transport_->writeChain(
|
||||
stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
|
||||
EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0);
|
||||
EXPECT_CALL(*cb, startWritingFromAppLimited(transport_.get(), _));
|
||||
EXPECT_CALL(*cb, packetsWritten(transport_.get(), _));
|
||||
EXPECT_CALL(*cb, appRateLimited(transport_.get(), _)).Times(0);
|
||||
loopForWrites();
|
||||
Mock::VerifyAndClearExpectations(cb.get());
|
||||
EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3);
|
||||
@@ -389,6 +392,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
|
||||
|
||||
Observer::Config config = {};
|
||||
config.appLimitedEvents = true;
|
||||
config.appDataSentEvents = true;
|
||||
auto cb = std::make_unique<StrictMock<MockObserver>>(config);
|
||||
|
||||
EXPECT_CALL(*cb, observerAttach(transport_.get()));
|
||||
@@ -397,7 +401,9 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
|
||||
auto stream = transport_->createBidirectionalStream().value();
|
||||
auto buf = buildRandomInputData(100 * 2000);
|
||||
transport_->writeChain(stream, buf->clone(), false, nullptr);
|
||||
EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0);
|
||||
EXPECT_CALL(*cb, startWritingFromAppLimited(transport_.get(), _));
|
||||
EXPECT_CALL(*cb, packetsWritten(transport_.get(), _));
|
||||
EXPECT_CALL(*cb, appRateLimited(transport_.get(), _)).Times(0);
|
||||
loopForWrites();
|
||||
Mock::VerifyAndClearExpectations(cb.get());
|
||||
EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3);
|
||||
@@ -410,6 +416,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
|
||||
TEST_F(QuicTransportTest, AppLimitedWithObservers) {
|
||||
Observer::Config config = {};
|
||||
config.appLimitedEvents = true;
|
||||
config.appDataSentEvents = true;
|
||||
auto cb1 = std::make_unique<StrictMock<MockObserver>>(config);
|
||||
auto cb2 = std::make_unique<StrictMock<MockObserver>>(config);
|
||||
EXPECT_CALL(*cb1, observerAttach(transport_.get()));
|
||||
@@ -430,8 +437,12 @@ TEST_F(QuicTransportTest, AppLimitedWithObservers) {
|
||||
transport_->writeChain(
|
||||
stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1);
|
||||
EXPECT_CALL(*cb1, appRateLimited(transport_.get()));
|
||||
EXPECT_CALL(*cb2, appRateLimited(transport_.get()));
|
||||
EXPECT_CALL(*cb1, startWritingFromAppLimited(transport_.get(), _));
|
||||
EXPECT_CALL(*cb1, packetsWritten(transport_.get(), _));
|
||||
EXPECT_CALL(*cb1, appRateLimited(transport_.get(), _));
|
||||
EXPECT_CALL(*cb2, startWritingFromAppLimited(transport_.get(), _));
|
||||
EXPECT_CALL(*cb2, packetsWritten(transport_.get(), _));
|
||||
EXPECT_CALL(*cb2, appRateLimited(transport_.get(), _));
|
||||
loopForWrites();
|
||||
Mock::VerifyAndClearExpectations(cb1.get());
|
||||
Mock::VerifyAndClearExpectations(cb2.get());
|
||||
|
@@ -564,7 +564,8 @@ OutstandingPacket makeTestingWritePacket(
|
||||
size_t desiredSize,
|
||||
uint64_t totalBytesSent,
|
||||
TimePoint sentTime /* = Clock::now() */,
|
||||
uint64_t inflightBytes /* = 0 */) {
|
||||
uint64_t inflightBytes /* = 0 */,
|
||||
uint64_t writeCount /* = 0 */) {
|
||||
LongHeader longHeader(
|
||||
LongHeader::Types::ZeroRtt,
|
||||
getTestConnectionId(1),
|
||||
@@ -580,7 +581,8 @@ OutstandingPacket makeTestingWritePacket(
|
||||
totalBytesSent,
|
||||
inflightBytes,
|
||||
0,
|
||||
LossState());
|
||||
LossState(),
|
||||
writeCount);
|
||||
}
|
||||
|
||||
CongestionController::AckEvent makeAck(
|
||||
|
@@ -222,7 +222,8 @@ OutstandingPacket makeTestingWritePacket(
|
||||
size_t desiredSize,
|
||||
uint64_t totalBytesSent,
|
||||
TimePoint sentTime = Clock::now(),
|
||||
uint64_t inflightBytes = 0);
|
||||
uint64_t inflightBytes = 0,
|
||||
uint64_t writeCount = 0);
|
||||
|
||||
// TODO: The way we setup packet sent, ack, loss in test cases can use some
|
||||
// major refactor.
|
||||
|
@@ -35,6 +35,9 @@ struct OutstandingPacketMetadata {
|
||||
uint32_t totalPacketsSent{0};
|
||||
// Total number of ack-eliciting packets sent on this connection.
|
||||
uint32_t totalAckElicitingPacketsSent{0};
|
||||
// Write Count is the value of the monotonically increasing counter which
|
||||
// tracks the number of writes on this socket.
|
||||
uint64_t writeCount{0};
|
||||
|
||||
OutstandingPacketMetadata(
|
||||
TimePoint timeIn,
|
||||
@@ -44,7 +47,8 @@ struct OutstandingPacketMetadata {
|
||||
uint64_t totalBytesSentIn,
|
||||
uint64_t inflightBytesIn,
|
||||
uint64_t packetsInflightIn,
|
||||
const LossState& lossStateIn)
|
||||
const LossState& lossStateIn,
|
||||
uint64_t writeCount)
|
||||
: time(timeIn),
|
||||
encodedSize(encodedSizeIn),
|
||||
isHandshake(isHandshakeIn),
|
||||
@@ -53,8 +57,8 @@ struct OutstandingPacketMetadata {
|
||||
inflightBytes(inflightBytesIn),
|
||||
packetsInflight(packetsInflightIn),
|
||||
totalPacketsSent(lossStateIn.totalPacketsSent),
|
||||
totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent) {
|
||||
}
|
||||
totalAckElicitingPacketsSent(lossStateIn.totalAckElicitingPacketsSent),
|
||||
writeCount(writeCount) {}
|
||||
};
|
||||
|
||||
// Data structure to represent outstanding retransmittable packets
|
||||
@@ -119,7 +123,8 @@ struct OutstandingPacket {
|
||||
uint64_t totalBytesSentIn,
|
||||
uint64_t inflightBytesIn,
|
||||
uint64_t packetsInflightIn,
|
||||
const LossState& lossStateIn)
|
||||
const LossState& lossStateIn,
|
||||
uint64_t writeCount = 0)
|
||||
: packet(std::move(packetIn)),
|
||||
metadata(OutstandingPacketMetadata(
|
||||
timeIn,
|
||||
@@ -129,7 +134,8 @@ struct OutstandingPacket {
|
||||
totalBytesSentIn,
|
||||
inflightBytesIn,
|
||||
packetsInflightIn,
|
||||
lossStateIn)) {}
|
||||
lossStateIn,
|
||||
writeCount)) {}
|
||||
|
||||
OutstandingPacket(
|
||||
RegularQuicWritePacket packetIn,
|
||||
@@ -140,7 +146,8 @@ struct OutstandingPacket {
|
||||
uint64_t totalBytesSentIn,
|
||||
uint64_t inflightBytesIn,
|
||||
uint64_t packetsInflightIn,
|
||||
const LossState& lossStateIn)
|
||||
const LossState& lossStateIn,
|
||||
uint64_t writeCount = 0)
|
||||
: packet(std::move(packetIn)),
|
||||
metadata(OutstandingPacketMetadata(
|
||||
timeIn,
|
||||
@@ -150,6 +157,7 @@ struct OutstandingPacket {
|
||||
totalBytesSentIn,
|
||||
inflightBytesIn,
|
||||
packetsInflightIn,
|
||||
lossStateIn)) {}
|
||||
lossStateIn,
|
||||
writeCount)) {}
|
||||
};
|
||||
} // namespace quic
|
||||
|
@@ -821,6 +821,19 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
|
||||
// Whether a connection can be paced based on its handshake and close states.
|
||||
// For example, we may not want to pace a connection that's still handshaking.
|
||||
bool canBePaced{false};
|
||||
|
||||
// Flag indicating whether the socket is currently waiting for the app to
|
||||
// write data. All new sockets start off in this state where they wait for the
|
||||
// application to pump data to the socket.
|
||||
// TODO: Merge this flag with the existing appLimited flag that exists on each
|
||||
// Congestion Controller.
|
||||
bool waitingForAppData{true};
|
||||
|
||||
// Monotonically increasing counter that is incremented each time there is a
|
||||
// write on this socket (writeSocketData() is called), This is used to
|
||||
// identify specific outstanding packets (based on writeCount and packetNum)
|
||||
// in the Observers, to construct Write Blocks
|
||||
uint64_t writeCount{0};
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);
|
||||
|
@@ -134,7 +134,9 @@ ProbeSizeRaiserType parseRaiserType(uint32_t type) {
|
||||
class TPerfObserver : public Observer {
|
||||
public:
|
||||
TPerfObserver(const Observer::Config& config) : Observer(config) {}
|
||||
void appRateLimited(QuicSocket* /* socket */) override {
|
||||
void appRateLimited(
|
||||
QuicSocket* /* socket */,
|
||||
const quic::Observer::AppLimitedEvent& /* appLimitedEvent */) override {
|
||||
if (FLAGS_log_app_rate_limited) {
|
||||
LOG(INFO) << "appRateLimited detected";
|
||||
}
|
||||
|
Reference in New Issue
Block a user