1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Use separate flags to enable app data sent callbacks

Summary:
Previously we were using a single flag appDataSentEvents to enable 3 callbacks
(startWritingFromAppLimited, packetsWritten and appRateLimited). This commit
creates separate flags to enable each of these callbacks, we have use-cases
where Observers might want only packetsWritten events and not the others.

Also, this commit refactors the logic to deliver these 3 callbacks to all
observers into a separate method so they can be unit tested easily.

Reviewed By: bschlinker

Differential Revision: D27925011

fbshipit-source-id: 7f7436dfc3d50c3abcb8ec121b221d20e30b0c4b
This commit is contained in:
Sridhar Srinivasan
2021-05-02 23:06:59 -07:00
committed by Facebook GitHub Bot
parent 1bfb6b25e3
commit 3a4783713a
6 changed files with 74 additions and 38 deletions

View File

@@ -34,30 +34,36 @@ class Observer {
* Specifies events observer wants to receive. * Specifies events observer wants to receive.
*/ */
struct Config { struct Config {
virtual ~Config() = default;
// following flags enable support for various callbacks. // following flags enable support for various callbacks.
// observer and socket lifecycle callbacks are always enabled. // observer and socket lifecycle callbacks are always enabled.
bool evbEvents{false}; bool evbEvents{false};
bool appDataSentEvents{false}; bool packetsWrittenEvents{false};
bool appLimitedEvents{false}; bool appRateLimitedEvents{false};
bool lossEvents{false}; bool lossEvents{false};
bool spuriousLossEvents{false}; bool spuriousLossEvents{false};
bool pmtuEvents{false}; bool pmtuEvents{false};
bool rttSamples{false}; bool rttSamples{false};
bool knobFrameEvents{false}; bool knobFrameEvents{false};
virtual void enableAllEvents() {
evbEvents = true;
packetsWrittenEvents = true;
appRateLimitedEvents = true;
rttSamples = true;
lossEvents = true;
spuriousLossEvents = true;
pmtuEvents = true;
knobFrameEvents = true;
}
/** /**
* Returns a config where all events are enabled. * Returns a config where all events are enabled.
*/ */
static Config getConfigAllEventsEnabled() { static Config getConfigAllEventsEnabled() {
Config config = {}; Config config = {};
config.evbEvents = true; config.enableAllEvents();
config.appDataSentEvents = true;
config.appLimitedEvents = true;
config.rttSamples = true;
config.lossEvents = true;
config.spuriousLossEvents = true;
config.pmtuEvents = true;
config.knobFrameEvents = true;
return config; return config;
} }
}; };

View File

@@ -2726,13 +2726,7 @@ void QuicTransportBase::writeSocketData() {
auto packetsBefore = conn_->outstandings.numOutstanding(); auto packetsBefore = conn_->outstandings.numOutstanding();
// Signal Observers the POTENTIAL start of a Write Block. // Signal Observers the POTENTIAL start of a Write Block.
if (conn_->waitingForAppData && conn_->congestionController) { if (conn_->waitingForAppData && conn_->congestionController) {
Observer::AppLimitedEvent startWritingFromAppLimitedEvent( notifyStartWritingFromAppRateLimited();
conn_->outstandings.packets, conn_->writeCount);
for (const auto& cb : *observers_) {
if (cb->getConfig().appDataSentEvents) {
cb->startWritingFromAppLimited(this, startWritingFromAppLimitedEvent);
}
}
conn_->waitingForAppData = false; conn_->waitingForAppData = false;
} }
writeData(); writeData();
@@ -2749,13 +2743,7 @@ void QuicTransportBase::writeSocketData() {
// These may/may not be app data packets, it it up to the Observers // These may/may not be app data packets, it it up to the Observers
// to deal with these packets. // to deal with these packets.
if (packetWritten && conn_->congestionController) { if (packetWritten && conn_->congestionController) {
Observer::AppLimitedEvent packetsWrittenEvent( notifyPacketsWritten();
conn_->outstandings.packets, conn_->writeCount);
for (const auto& cb : *observers_) {
if (cb->getConfig().appDataSentEvents) {
cb->packetsWritten(this, packetsWrittenEvent);
}
}
} }
if (conn_->loopDetectorCallback && packetWritten) { if (conn_->loopDetectorCallback && packetWritten) {
conn_->writeDebugState.currentEmptyLoopCount = 0; conn_->writeDebugState.currentEmptyLoopCount = 0;
@@ -2792,13 +2780,7 @@ void QuicTransportBase::writeSocketData() {
conn_->congestionController->setAppLimited(); conn_->congestionController->setAppLimited();
// notify via connection call and any observer callbacks // notify via connection call and any observer callbacks
connCallback_->onAppRateLimited(); connCallback_->onAppRateLimited();
Observer::AppLimitedEvent appLimitedEvent( notifyAppRateLimited();
conn_->outstandings.packets, conn_->writeCount);
for (const auto& cb : *observers_) {
if (cb->getConfig().appLimitedEvents) {
cb->appRateLimited(this, appLimitedEvent);
}
}
conn_->waitingForAppData = true; conn_->waitingForAppData = true;
} }
} }
@@ -3237,4 +3219,34 @@ QuicSocket::WriteResult QuicTransportBase::setDSRPacketizationRequestSender(
return folly::unit; return folly::unit;
} }
void QuicTransportBase::notifyStartWritingFromAppRateLimited() {
Observer::AppLimitedEvent startWritingFromAppLimitedEvent(
conn_->outstandings.packets, conn_->writeCount);
for (const auto& cb : *observers_) {
if (cb->getConfig().appRateLimitedEvents) {
cb->startWritingFromAppLimited(this, startWritingFromAppLimitedEvent);
}
}
}
void QuicTransportBase::notifyPacketsWritten() {
Observer::AppLimitedEvent packetsWrittenEvent(
conn_->outstandings.packets, conn_->writeCount);
for (const auto& cb : *observers_) {
if (cb->getConfig().packetsWrittenEvents) {
cb->packetsWritten(this, packetsWrittenEvent);
}
}
}
void QuicTransportBase::notifyAppRateLimited() {
Observer::AppLimitedEvent appRateLimitedEvent(
conn_->outstandings.packets, conn_->writeCount);
for (const auto& cb : *observers_) {
if (cb->getConfig().appRateLimitedEvents) {
cb->appRateLimited(this, appRateLimitedEvent);
}
}
}
} // namespace quic } // namespace quic

View File

@@ -718,6 +718,12 @@ class QuicTransportBase : public QuicSocket {
void validateCongestionAndPacing(CongestionControlType& type); void validateCongestionAndPacing(CongestionControlType& type);
// Helpers to notify all registered observers about specific events during
// socket write (if enabled in the observer's config).
void notifyStartWritingFromAppRateLimited();
void notifyPacketsWritten();
void notifyAppRateLimited();
/** /**
* Callback when we receive a transport knob * Callback when we receive a transport knob
*/ */

View File

@@ -357,8 +357,8 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) {
})); }));
Observer::Config config = {}; Observer::Config config = {};
config.appLimitedEvents = true; config.packetsWrittenEvents = true;
config.appDataSentEvents = true; config.appRateLimitedEvents = true;
auto cb = std::make_unique<StrictMock<MockObserver>>(config); auto cb = std::make_unique<StrictMock<MockObserver>>(config);
EXPECT_CALL(*cb, observerAttach(transport_.get())); EXPECT_CALL(*cb, observerAttach(transport_.get()));
@@ -390,8 +390,8 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
.WillRepeatedly(Return(5000)); .WillRepeatedly(Return(5000));
Observer::Config config = {}; Observer::Config config = {};
config.appLimitedEvents = true; config.packetsWrittenEvents = true;
config.appDataSentEvents = true; config.appRateLimitedEvents = true;
auto cb = std::make_unique<StrictMock<MockObserver>>(config); auto cb = std::make_unique<StrictMock<MockObserver>>(config);
EXPECT_CALL(*cb, observerAttach(transport_.get())); EXPECT_CALL(*cb, observerAttach(transport_.get()));
@@ -414,8 +414,8 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
TEST_F(QuicTransportTest, AppLimitedWithObservers) { TEST_F(QuicTransportTest, AppLimitedWithObservers) {
Observer::Config config = {}; Observer::Config config = {};
config.appLimitedEvents = true; config.packetsWrittenEvents = true;
config.appDataSentEvents = true; config.appRateLimitedEvents = true;
auto cb1 = std::make_unique<StrictMock<MockObserver>>(config); auto cb1 = std::make_unique<StrictMock<MockObserver>>(config);
auto cb2 = std::make_unique<StrictMock<MockObserver>>(config); auto cb2 = std::make_unique<StrictMock<MockObserver>>(config);
EXPECT_CALL(*cb1, observerAttach(transport_.get())); EXPECT_CALL(*cb1, observerAttach(transport_.get()));

View File

@@ -132,6 +132,18 @@ class TestQuicTransport
setIdleTimer(); setIdleTimer();
} }
void invokeNotifyStartWritingFromAppRateLimited() {
notifyStartWritingFromAppRateLimited();
}
void invokeNotifyPacketsWritten() {
notifyPacketsWritten();
}
void invokeNotifyAppRateLimited() {
notifyAppRateLimited();
}
std::unique_ptr<Aead> aead; std::unique_ptr<Aead> aead;
std::unique_ptr<PacketNumberCipher> headerCipher; std::unique_ptr<PacketNumberCipher> headerCipher;
bool closed{false}; bool closed{false};

View File

@@ -194,7 +194,7 @@ class TPerfAcceptObserver : public AcceptObserver {
// Create an observer config, only enabling events we are interested in // Create an observer config, only enabling events we are interested in
// receiving. // receiving.
Observer::Config config = {}; Observer::Config config = {};
config.appLimitedEvents = true; config.appRateLimitedEvents = true;
config.pmtuEvents = true; config.pmtuEvents = true;
config.rttSamples = true; config.rttSamples = true;
config.lossEvents = true; config.lossEvents = true;