1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Migrate the QUIC and TransportMonitor libraries to use the new unified Observer callback class

Summary:
We were using the LifecycleObserver and InstrumentationObserver classes
separately, to generate and receive callbacks.

This change migrates both these to use the unified Observer callback class and
adjusts the unit tests.

Reviewed By: bschlinker

Differential Revision: D25845845

fbshipit-source-id: c489400f5d70bccadbcc1d957136c5ade36b65ff
This commit is contained in:
Sridhar Srinivasan
2021-01-13 15:33:33 -08:00
committed by Facebook GitHub Bot
parent c29f98abae
commit 27fe474171
20 changed files with 261 additions and 450 deletions

View File

@@ -253,6 +253,10 @@ class Observer {
const PMTUUpperBoundEvent& /* pmtuUpperBoundEvent */) {}
};
// Container for instrumentation observers.
// Avoids heap allocation for up to 2 observers being installed.
using ObserverVec = SmallVec<Observer*, 2>;
/**
* ===== Instrumentation Observer API =====
*/

View File

@@ -1112,64 +1112,32 @@ class QuicSocket {
*/
virtual void setCongestionControl(CongestionControlType type) = 0;
// Container for lifecycle observers.
// Avoids heap allocation for up to 2 observers being installed.
using LifecycleObserverVec = SmallVec<LifecycleObserver*, 2>;
/**
* Adds a lifecycle observer.
* Adds an observer.
*
* Observers can tie their lifetime to aspects of this socket's lifecycle /
* Observers can tie their lifetime to aspects of this socket's /
* lifetime and perform inspection at various states.
*
* This enables instrumentation to be added without changing / interfering
* with how the application uses the socket.
*
* @param observer Observer to add (implements LifecycleObserver).
* @param observer Observer to add (implements Observer).
*/
virtual void addLifecycleObserver(LifecycleObserver* observer) = 0;
virtual void addObserver(Observer* observer) = 0;
/**
* Removes a lifecycle observer.
* Removes an observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
virtual bool removeLifecycleObserver(LifecycleObserver* observer) = 0;
virtual bool removeObserver(Observer* observer) = 0;
/**
* Returns installed lifecycle observers.
* Returns installed observers.
*
* @return Reference to const vector with installed observers.
*/
FOLLY_NODISCARD virtual const LifecycleObserverVec& getLifecycleObservers()
const = 0;
/**
* Adds a instrumentation observer.
*
* Instrumentation observers get notified of various socket events.
*
* @param observer Observer to add (implements InstrumentationObserver).
*/
virtual void addInstrumentationObserver(
InstrumentationObserver* observer) = 0;
/**
* Removes a instrumentation observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
virtual bool removeInstrumentationObserver(
InstrumentationObserver* observer) = 0;
/**
* Returns installed instrumentation observers.
*
* @return Reference to const vector with installed observers.
*/
FOLLY_NODISCARD virtual const InstrumentationObserverVec&
getInstrumentationObservers() const = 0;
FOLLY_NODISCARD virtual const ObserverVec& getObservers() const = 0;
};
} // namespace quic

View File

@@ -140,6 +140,7 @@ const folly::SocketAddress& QuicTransportBase::getLocalAddress() const {
QuicTransportBase::~QuicTransportBase() {
connCallback_ = nullptr;
QUIC_TRACE(
conn_close,
*conn_,
@@ -161,7 +162,7 @@ QuicTransportBase::~QuicTransportBase() {
sock->pauseRead();
sock->close();
}
for (const auto& cb : lifecycleObservers_) {
for (const auto& cb : *observers_) {
cb->destroy(this);
}
}
@@ -257,13 +258,9 @@ void QuicTransportBase::closeImpl(
folly::Optional<std::pair<QuicErrorCode, std::string>> errorCode,
bool drainConnection,
bool sendCloseImmediately) {
for (const auto& cb : lifecycleObservers_) {
for (const auto& cb : *observers_) {
cb->close(this, errorCode);
}
for (const auto& cb : conn_->instrumentationObservers_) {
cb->observerDetach(this);
}
conn_->instrumentationObservers_.clear();
if (closeState_ == CloseState::CLOSED) {
return;
@@ -2673,56 +2670,27 @@ void QuicTransportBase::cancelAllAppCallbacks(
}
}
void QuicTransportBase::addLifecycleObserver(LifecycleObserver* observer) {
lifecycleObservers_.push_back(CHECK_NOTNULL(observer));
void QuicTransportBase::addObserver(Observer* observer) {
observers_->push_back(CHECK_NOTNULL(observer));
observer->observerAttach(this);
}
bool QuicTransportBase::removeLifecycleObserver(LifecycleObserver* observer) {
const auto eraseIt = std::remove(
lifecycleObservers_.begin(), lifecycleObservers_.end(), observer);
if (eraseIt == lifecycleObservers_.end()) {
bool QuicTransportBase::removeObserver(Observer* observer) {
const auto eraseIt =
std::remove(observers_->begin(), observers_->end(), observer);
if (eraseIt == observers_->end()) {
return false;
}
for (auto it = eraseIt; it != lifecycleObservers_.end(); it++) {
for (auto it = eraseIt; it != observers_->end(); it++) {
(*it)->observerDetach(this);
}
lifecycleObservers_.erase(eraseIt, lifecycleObservers_.end());
observers_->erase(eraseIt, observers_->end());
return true;
}
const QuicTransportBase::LifecycleObserverVec&
QuicTransportBase::getLifecycleObservers() const {
return lifecycleObservers_;
}
void QuicTransportBase::addInstrumentationObserver(
InstrumentationObserver* observer) {
conn_->instrumentationObservers_.push_back(CHECK_NOTNULL(observer));
}
bool QuicTransportBase::removeInstrumentationObserver(
InstrumentationObserver* observer) {
const auto eraseIt = std::remove(
conn_->instrumentationObservers_.begin(),
conn_->instrumentationObservers_.end(),
observer);
if (eraseIt == conn_->instrumentationObservers_.end()) {
return false;
}
for (auto it = eraseIt; it != conn_->instrumentationObservers_.end(); it++) {
(*it)->observerDetach(this);
}
conn_->instrumentationObservers_.erase(
eraseIt, conn_->instrumentationObservers_.end());
return true;
}
const InstrumentationObserverVec&
QuicTransportBase::getInstrumentationObservers() const {
return conn_->instrumentationObservers_;
const ObserverVec& QuicTransportBase::getObservers() const {
return *observers_;
}
void QuicTransportBase::writeSocketData() {
@@ -2771,9 +2739,9 @@ void QuicTransportBase::writeSocketData() {
currentSendBufLen < conn_->udpSendPacketLen && lossBufferEmpty &&
conn_->congestionController->getWritableBytes()) {
conn_->congestionController->setAppLimited();
// notify via connection call and any instrumentation callbacks
// notify via connection call and any observer callbacks
connCallback_->onAppRateLimited();
for (const auto& cb : conn_->instrumentationObservers_) {
for (const auto& cb : *observers_) {
cb->appRateLimited(this);
}
}
@@ -2979,7 +2947,7 @@ void QuicTransportBase::attachEventBase(folly::EventBase* evb) {
updatePeekLooper();
updateWriteLooper(false);
for (const auto& cb : lifecycleObservers_) {
for (const auto& cb : *observers_) {
cb->evbAttach(this, evb_);
}
}
@@ -3001,7 +2969,7 @@ void QuicTransportBase::detachEventBase() {
peekLooper_->detachEventBase();
writeLooper_->detachEventBase();
for (const auto& cb : lifecycleObservers_) {
for (const auto& cb : *observers_) {
cb->evbDetach(this, evb_);
}
evb_ = nullptr;

View File

@@ -590,7 +590,7 @@ class QuicTransportBase : public QuicSocket {
const std::pair<QuicErrorCode, folly::StringPiece>& error) noexcept;
/**
* Adds a lifecycle observer.
* Adds an observer.
*
* Observers can tie their lifetime to aspects of this socket's lifecycle /
* lifetime and perform inspection at various states.
@@ -598,51 +598,24 @@ class QuicTransportBase : public QuicSocket {
* This enables instrumentation to be added without changing / interfering
* with how the application uses the socket.
*
* @param observer Observer to add (implements LifecycleObserver).
* @param observer Observer to add (implements Observer).
*/
void addLifecycleObserver(LifecycleObserver* observer) override;
void addObserver(Observer* observer) override;
/**
* Removes a lifecycle observer.
* Removes an observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
bool removeLifecycleObserver(LifecycleObserver* observer) override;
bool removeObserver(Observer* observer) override;
/**
* Returns installed lifecycle observers.
* Returns installed observers.
*
* @return Reference to const vector with installed observers.
*/
FOLLY_NODISCARD const LifecycleObserverVec& getLifecycleObservers()
const override;
/**
* Adds a instrumentation observer.
*
* Instrumentation observers get notified of various socket events.
*
* @param observer Observer to add (implements InstrumentationObserver).
*/
void addInstrumentationObserver(InstrumentationObserver* observer) override;
/**
* Removes a instrumentation observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
bool removeInstrumentationObserver(
InstrumentationObserver* observer) override;
/**
* Returns installed instrumentation observers.
*
* @return Reference to const vector with installed observers.
*/
FOLLY_NODISCARD const InstrumentationObserverVec&
getInstrumentationObservers() const override;
FOLLY_NODISCARD const ObserverVec& getObservers() const override;
protected:
void updateCongestionControlSettings(
@@ -846,11 +819,9 @@ class QuicTransportBase : public QuicSocket {
folly::Optional<std::string> exceptionCloseWhat_;
// Lifecycle observers
LifecycleObserverVec lifecycleObservers_;
// Observers
// Instrumentation observers
InstrumentationObserverVec instrumentationObservers_;
std::shared_ptr<ObserverVec> observers_{std::make_shared<ObserverVec>()};
uint64_t qlogRefcnt_{0};
};

View File

@@ -285,14 +285,8 @@ class MockQuicSocket : public QuicSocket {
earlyDataAppParamsValidator_;
folly::Function<Buf()> earlyDataAppParamsGetter_;
MOCK_METHOD1(addLifecycleObserver, void(LifecycleObserver*));
MOCK_METHOD1(removeLifecycleObserver, bool(LifecycleObserver*));
MOCK_CONST_METHOD0(getLifecycleObservers, const LifecycleObserverVec&());
MOCK_METHOD1(addInstrumentationObserver, void(InstrumentationObserver*));
MOCK_METHOD1(removeInstrumentationObserver, bool(InstrumentationObserver*));
MOCK_CONST_METHOD0(
getInstrumentationObservers,
const InstrumentationObserverVec&());
MOCK_METHOD1(addObserver, void(Observer*));
MOCK_METHOD1(removeObserver, bool(Observer*));
MOCK_CONST_METHOD0(getObservers, const ObserverVec&());
};
} // namespace quic

View File

@@ -307,7 +307,7 @@ class MockLoopDetectorCallback : public LoopDetectorCallback {
MOCK_METHOD2(onSuspiciousReadLoops, void(uint64_t, NoReadReason));
};
class MockLifecycleObserver : public LifecycleObserver {
class MockObserver : public Observer {
public:
GMOCK_METHOD1_(, noexcept, , observerAttach, void(QuicSocket*));
GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*));
@@ -322,18 +322,13 @@ class MockLifecycleObserver : public LifecycleObserver {
void(
QuicSocket*,
const folly::Optional<std::pair<QuicErrorCode, std::string>>&));
};
class MockInstrumentationObserver : public InstrumentationObserver {
public:
GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*));
GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*));
GMOCK_METHOD2_(
,
noexcept,
,
packetLossDetected,
void(QuicSocket*, const ObserverLossEvent&));
void(QuicSocket*, const LossEvent&));
GMOCK_METHOD2_(
,
noexcept,
@@ -357,11 +352,10 @@ class MockInstrumentationObserver : public InstrumentationObserver {
static auto getLossPacketMatcher(bool reorderLoss, bool timeoutLoss) {
return AllOf(
testing::Field(
&InstrumentationObserver::LostPacket::lostByReorderThreshold,
&Observer::LostPacket::lostByReorderThreshold,
testing::Eq(reorderLoss)),
testing::Field(
&InstrumentationObserver::LostPacket::lostByTimeout,
testing::Eq(timeoutLoss)));
&Observer::LostPacket::lostByTimeout, testing::Eq(timeoutLoss)));
}
};

View File

@@ -3386,30 +3386,28 @@ TEST_F(QuicTransportImplTest, StreamWriteCallbackUnregister) {
evb->loopOnce();
}
TEST_F(QuicTransportImplTest, LifecycleObserverAttachRemove) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverAttachRemove) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeLifecycleObserver(cb.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getLifecycleObservers(), IsEmpty());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_F(QuicTransportImplTest, LifecycleObserverRemoveMissing) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
EXPECT_FALSE(transport->removeLifecycleObserver(cb.get()));
EXPECT_THAT(transport->getLifecycleObservers(), IsEmpty());
TEST_F(QuicTransportImplTest, ObserverRemoveMissing) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_FALSE(transport->removeObserver(cb.get()));
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_F(QuicTransportImplTest, LifecycleObserverDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
InSequence s;
EXPECT_CALL(*cb, close(transport.get(), _)).Times(2);
EXPECT_CALL(*cb, destroy(transport.get()));
@@ -3417,14 +3415,11 @@ TEST_F(QuicTransportImplTest, LifecycleObserverDestroyTransport) {
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(
QuicTransportImplTest,
LifecycleObserverCloseNoErrorThenDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverCloseNoErrorThenDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
const std::pair<QuicErrorCode, std::string> defaultError = std::make_pair(
GenericApplicationErrorCode::NO_ERROR,
@@ -3444,14 +3439,11 @@ TEST_F(
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(
QuicTransportImplTest,
LifecycleObserverCloseWithErrorThenDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverCloseWithErrorThenDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
const auto testError = std::make_pair(
QuicErrorCode(LocalErrorCode::CONNECTION_RESET),
@@ -3470,100 +3462,90 @@ TEST_F(
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(QuicTransportImplTest, LifecycleObserverDetachObserverImmediately) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverDetachObserverImmediately) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeLifecycleObserver(cb.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getLifecycleObservers(), IsEmpty());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_F(
QuicTransportImplTest,
LifecycleObserverDetachObserverAfterTransportClose) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverDetachObserverAfterTransportClose) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, close(transport.get(), _));
transport->close(folly::none);
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeLifecycleObserver(cb.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getLifecycleObservers(), IsEmpty());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_F(
QuicTransportImplTest,
LifecycleObserverDetachObserverOnCloseDuringTransportDestroy) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
ObserverDetachObserverOnCloseDuringTransportDestroy) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
InSequence s;
EXPECT_CALL(*cb, close(transport.get(), _))
.WillOnce(Invoke([&cb](auto callbackTransport, auto /* errorOpt */) {
EXPECT_TRUE(callbackTransport->removeLifecycleObserver(cb.get()));
EXPECT_TRUE(callbackTransport->removeObserver(cb.get()));
}));
EXPECT_CALL(*cb, observerDetach(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(QuicTransportImplTest, LifecycleObserverMultipleAttachRemove) {
auto cb1 = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverMultipleAttachRemove) {
auto cb1 = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport.get()));
transport->addLifecycleObserver(cb1.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb1.get()));
transport->addObserver(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockLifecycleObserver>>();
auto cb2 = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb2, observerAttach(transport.get()));
transport->addLifecycleObserver(cb2.get());
transport->addObserver(cb2.get());
EXPECT_THAT(
transport->getLifecycleObservers(),
UnorderedElementsAre(cb1.get(), cb2.get()));
transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
EXPECT_CALL(*cb2, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeLifecycleObserver(cb2.get()));
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb1.get()));
EXPECT_TRUE(transport->removeObserver(cb2.get()));
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_CALL(*cb1, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeLifecycleObserver(cb1.get()));
EXPECT_THAT(transport->getLifecycleObservers(), IsEmpty());
EXPECT_TRUE(transport->removeObserver(cb1.get()));
EXPECT_THAT(transport->getObservers(), IsEmpty());
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
transport = nullptr;
}
TEST_F(QuicTransportImplTest, LifecycleObserverMultipleAttachDestroyTransport) {
auto cb1 = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverMultipleAttachDestroyTransport) {
auto cb1 = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport.get()));
transport->addLifecycleObserver(cb1.get());
EXPECT_THAT(
transport->getLifecycleObservers(), UnorderedElementsAre(cb1.get()));
transport->addObserver(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockLifecycleObserver>>();
auto cb2 = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb2, observerAttach(transport.get()));
transport->addLifecycleObserver(cb2.get());
transport->addObserver(cb2.get());
EXPECT_THAT(
transport->getLifecycleObservers(),
UnorderedElementsAre(cb1.get(), cb2.get()));
transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
InSequence s;
EXPECT_CALL(*cb1, close(transport.get(), _));
@@ -3577,12 +3559,12 @@ TEST_F(QuicTransportImplTest, LifecycleObserverMultipleAttachDestroyTransport) {
Mock::VerifyAndClearExpectations(cb2.get());
}
TEST_F(QuicTransportImplTest, LifecycleObserverDetachAndAttachEvb) {
auto cb = std::make_unique<StrictMock<MockLifecycleObserver>>();
TEST_F(QuicTransportImplTest, ObserverDetachAndAttachEvb) {
auto cb = std::make_unique<StrictMock<MockObserver>>();
folly::EventBase evb2;
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addLifecycleObserver(cb.get());
transport->addObserver(cb.get());
Mock::VerifyAndClearExpectations(cb.get());
// Detach the event base evb and attach a new event base evb2
@@ -3608,89 +3590,10 @@ TEST_F(QuicTransportImplTest, LifecycleObserverDetachAndAttachEvb) {
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeLifecycleObserver(cb.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(QuicTransportImplTest, InstrumentationObserverAttachRemove) {
auto cb = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport->addInstrumentationObserver(cb.get());
EXPECT_THAT(
transport->getInstrumentationObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeInstrumentationObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getInstrumentationObservers(), IsEmpty());
}
TEST_F(QuicTransportImplTest, InstrumentationObserverRemoveMissing) {
auto cb = std::make_unique<StrictMock<MockInstrumentationObserver>>();
EXPECT_FALSE(transport->removeInstrumentationObserver(cb.get()));
EXPECT_THAT(transport->getInstrumentationObservers(), IsEmpty());
}
TEST_F(QuicTransportImplTest, InstrumentationObserverAttachDestroyTransport) {
auto cb = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport->addInstrumentationObserver(cb.get());
EXPECT_THAT(
transport->getInstrumentationObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(QuicTransportImplTest, InstrumentationObserverMultipleAttachRemove) {
auto cb1 = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport->addInstrumentationObserver(cb1.get());
EXPECT_THAT(
transport->getInstrumentationObservers(),
UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport->addInstrumentationObserver(cb2.get());
EXPECT_THAT(
transport->getInstrumentationObservers(),
UnorderedElementsAre(cb1.get(), cb2.get()));
EXPECT_CALL(*cb2, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeInstrumentationObserver(cb2.get()));
EXPECT_THAT(
transport->getInstrumentationObservers(),
UnorderedElementsAre(cb1.get()));
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_CALL(*cb1, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeInstrumentationObserver(cb1.get()));
EXPECT_THAT(transport->getInstrumentationObservers(), IsEmpty());
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
transport = nullptr;
}
TEST_F(
QuicTransportImplTest,
InstrumentationObserverMultipleAttachDestroyTransport) {
auto cb1 = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport->addInstrumentationObserver(cb1.get());
EXPECT_THAT(
transport->getInstrumentationObservers(),
UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport->addInstrumentationObserver(cb2.get());
EXPECT_THAT(
transport->getInstrumentationObservers(),
UnorderedElementsAre(cb1.get(), cb2.get()));
EXPECT_CALL(*cb1, observerDetach(transport.get()));
EXPECT_CALL(*cb2, observerDetach(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
}
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksDeleted) {
auto noopCallback = [](QuicSocket*) {};
transport->transportConn->pendingCallbacks.emplace_back(noopCallback);
@@ -3718,7 +3621,6 @@ TEST_F(
ImplementationObserverCallbacksCorrectQuicSocket) {
QuicSocket* returnedSocket = nullptr;
auto func = [&](QuicSocket* qSocket) { returnedSocket = qSocket; };
auto ib = MockInstrumentationObserver();
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
transport->transportConn->pendingCallbacks.emplace_back(func);

View File

@@ -355,9 +355,7 @@ TEST_F(QuicTransportTest, AppLimited) {
transport_->close(folly::none);
}
TEST_F(
QuicTransportTest,
NotAppLimitedWithNoWritableBytesWithInstrumentationObservers) {
TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) {
auto& conn = transport_->getConnectionState();
// Replace with MockConnectionCallback:
auto mockCongestionController =
@@ -372,8 +370,9 @@ TEST_F(
return 0;
}));
auto cb = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport_->addInstrumentationObserver(cb.get());
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport_.get()));
transport_->addObserver(cb.get());
auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain(
@@ -385,14 +384,14 @@ TEST_F(
EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0);
loopForWrites();
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_CALL(*cb, observerDetach(transport_.get()));
EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3);
EXPECT_CALL(*cb, destroy(transport_.get()));
transport_->close(folly::none);
transport_ = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(
QuicTransportTest,
NotAppLimitedWithLargeBufferWithInstrumentationObservers) {
TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
auto& conn = transport_->getConnectionState();
// Replace with MockConnectionCallback:
auto mockCongestionController =
@@ -402,8 +401,9 @@ TEST_F(
EXPECT_CALL(*rawCongestionController, getWritableBytes())
.WillRepeatedly(Return(5000));
auto cb = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport_->addInstrumentationObserver(cb.get());
auto cb = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb, observerAttach(transport_.get()));
transport_->addObserver(cb.get());
auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(100 * 2000);
@@ -411,16 +411,20 @@ TEST_F(
EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0);
loopForWrites();
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_CALL(*cb, observerDetach(transport_.get()));
EXPECT_CALL(*cb, close(transport_.get(), _)).Times(3);
EXPECT_CALL(*cb, destroy(transport_.get()));
transport_->close(folly::none);
transport_ = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(QuicTransportTest, AppLimitedWithInstrumentationObservers) {
auto cb1 = std::make_unique<StrictMock<MockInstrumentationObserver>>();
auto cb2 = std::make_unique<StrictMock<MockInstrumentationObserver>>();
transport_->addInstrumentationObserver(cb1.get());
transport_->addInstrumentationObserver(cb2.get());
TEST_F(QuicTransportTest, AppLimitedWithObservers) {
auto cb1 = std::make_unique<StrictMock<MockObserver>>();
auto cb2 = std::make_unique<StrictMock<MockObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport_.get()));
EXPECT_CALL(*cb2, observerAttach(transport_.get()));
transport_->addObserver(cb1.get());
transport_->addObserver(cb2.get());
auto& conn = transport_->getConnectionState();
// Replace with MockConnectionCallback:
@@ -444,9 +448,12 @@ TEST_F(QuicTransportTest, AppLimitedWithInstrumentationObservers) {
loopForWrites();
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_CALL(*cb1, observerDetach(transport_.get()));
EXPECT_CALL(*cb2, observerDetach(transport_.get()));
EXPECT_CALL(*cb1, close(transport_.get(), _)).Times(3);
EXPECT_CALL(*cb2, close(transport_.get(), _)).Times(3);
EXPECT_CALL(*cb1, destroy(transport_.get()));
EXPECT_CALL(*cb2, destroy(transport_.get()));
transport_->close(folly::none);
transport_ = nullptr;
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
}

View File

@@ -45,6 +45,7 @@ QuicClientTransport::QuicClientTransport(
std::make_unique<QuicClientConnectionState>(std::move(handshakeFactory));
clientConn_ = tempConn.get();
conn_.reset(tempConn.release());
conn_->observers = observers_;
auto srcConnId = connectionIdSize > 0
? ConnectionId::createRandom(connectionIdSize)

View File

@@ -26,8 +26,8 @@ static TimePoint reportUpperBound(QuicConnectionStateBase& conn) {
const auto now = Clock::now();
QUIC_STATS(conn.statsCallback, onConnectionPMTUUpperBoundDetected);
if (conn.instrumentationObservers_.size() > 0) {
InstrumentationObserver::PMTUUpperBoundEvent upperBoundEvent(
if (conn.observers->size() > 0) {
Observer::PMTUUpperBoundEvent upperBoundEvent(
now,
std::chrono::duration_cast<std::chrono::microseconds>(
now - d6d.meta.timeLastNonSearchState),
@@ -36,7 +36,7 @@ static TimePoint reportUpperBound(QuicConnectionStateBase& conn) {
d6d.meta.totalTxedProbes,
conn.transportSettings.d6dConfig.raiserType);
// enqueue a function for every observer to invoke callback
for (const auto& observer : conn.instrumentationObservers_) {
for (const auto& observer : *(conn.observers)) {
conn.pendingCallbacks.emplace_back(
[observer, upperBoundEvent](QuicSocket* qSocket) {
observer->pmtuUpperBoundDetected(qSocket, upperBoundEvent);
@@ -56,8 +56,8 @@ static TimePoint reportBlackhole(
QUIC_STATS(conn.statsCallback, onConnectionPMTUBlackholeDetected);
auto& d6d = conn.d6d;
const auto now = Clock::now();
if (conn.instrumentationObservers_.size() > 0) {
InstrumentationObserver::PMTUBlackholeEvent blackholeEvent(
if (conn.observers->size() > 0) {
Observer::PMTUBlackholeEvent blackholeEvent(
now,
std::chrono::duration_cast<std::chrono::microseconds>(
now - d6d.meta.timeLastNonSearchState),
@@ -70,7 +70,7 @@ static TimePoint reportBlackhole(
packet);
// If there are observers, enqueue a function to invoke callback
for (const auto& observer : conn.instrumentationObservers_) {
for (const auto& observer : *(conn.observers)) {
conn.pendingCallbacks.emplace_back(
[observer, blackholeEvent](QuicSocket* qSocket) {
observer->pmtuBlackholeDetected(qSocket, blackholeEvent);

View File

@@ -191,9 +191,10 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInBase) {
const uint16_t expectPMTU = 1400;
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::BASE;
d6d.outstandingProbes = 1;
d6d.currentProbeSize = d6d.basePMTU;
@@ -214,8 +215,7 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInBase) {
EXPECT_CALL(*mockRaiser, raiseProbeSize(d6d.currentProbeSize))
.Times(1)
.WillOnce(Return(expectPMTU));
EXPECT_CALL(*mockInstrumentationObserver, pmtuUpperBoundDetected(_, _))
.Times(0);
EXPECT_CALL(*mockObserver, pmtuUpperBoundDetected(_, _)).Times(0);
onD6DLastProbeAcked(conn);
for (auto& callback : conn.pendingCallbacks) {
callback(nullptr);
@@ -232,9 +232,10 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInSearchingOne) {
const uint16_t expectPMTU = 1400;
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::SEARCHING;
d6d.outstandingProbes = 1;
conn.udpSendPacketLen = 1250;
@@ -256,8 +257,7 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInSearchingOne) {
EXPECT_CALL(*mockRaiser, raiseProbeSize(d6d.currentProbeSize))
.Times(1)
.WillOnce(Return(expectPMTU));
EXPECT_CALL(*mockInstrumentationObserver, pmtuUpperBoundDetected(_, _))
.Times(0);
EXPECT_CALL(*mockObserver, pmtuUpperBoundDetected(_, _)).Times(0);
onD6DLastProbeAcked(conn);
for (auto& callback : conn.pendingCallbacks) {
callback(nullptr);
@@ -274,9 +274,10 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInSearchingMax) {
const uint16_t oversize = 1500;
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::SEARCHING;
d6d.outstandingProbes = 3;
conn.udpSendPacketLen = 1400;
@@ -299,18 +300,16 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInSearchingMax) {
EXPECT_CALL(*mockRaiser, raiseProbeSize(d6d.currentProbeSize))
.Times(1)
.WillOnce(Return(oversize));
EXPECT_CALL(*mockInstrumentationObserver, pmtuUpperBoundDetected(_, _))
EXPECT_CALL(*mockObserver, pmtuUpperBoundDetected(_, _))
.Times(1)
.WillOnce(Invoke(
[&](QuicSocket* /* qSocket */,
const InstrumentationObserver::PMTUUpperBoundEvent& event) {
.WillOnce(Invoke([&](QuicSocket* /* qSocket */,
const Observer::PMTUUpperBoundEvent& event) {
EXPECT_LT(now, event.upperBoundTime);
EXPECT_LT(0us, event.timeSinceLastNonSearchState);
EXPECT_EQ(D6DMachineState::BASE, event.lastNonSearchState);
EXPECT_EQ(1450, event.upperBoundPMTU);
EXPECT_EQ(10, event.cumulativeProbesSent);
EXPECT_EQ(
ProbeSizeRaiserType::ConstantStep, event.probeSizeRaiserType);
EXPECT_EQ(ProbeSizeRaiserType::ConstantStep, event.probeSizeRaiserType);
}));
onD6DLastProbeAcked(conn);
for (auto& callback : conn.pendingCallbacks) {
@@ -327,9 +326,10 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInError) {
QuicConnectionStateBase conn(QuicNodeType::Server);
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::ERROR;
d6d.outstandingProbes = 3;
conn.udpSendPacketLen = d6d.basePMTU;
@@ -351,8 +351,7 @@ TEST_F(QuicD6DStateFunctionsTest, D6DProbeAckedInError) {
EXPECT_CALL(*mockRaiser, raiseProbeSize(d6d.currentProbeSize))
.Times(1)
.WillOnce(Return(1300)); // Won't be used
EXPECT_CALL(*mockInstrumentationObserver, pmtuUpperBoundDetected(_, _))
.Times(0);
EXPECT_CALL(*mockObserver, pmtuUpperBoundDetected(_, _)).Times(0);
onD6DLastProbeAcked(conn);
for (auto& callback : conn.pendingCallbacks) {
callback(nullptr);
@@ -368,9 +367,10 @@ TEST_F(QuicD6DStateFunctionsTest, BlackholeInSearching) {
QuicConnectionStateBase conn(QuicNodeType::Server);
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::SEARCHING;
d6d.outstandingProbes = 2;
conn.udpSendPacketLen = d6d.basePMTU + 20;
@@ -400,11 +400,10 @@ TEST_F(QuicD6DStateFunctionsTest, BlackholeInSearching) {
std::chrono::microseconds(kDefaultD6DBlackholeDetectionWindow).count(),
1); // Threshold of 1 will cause window to be set to 0
EXPECT_CALL(*mockInstrumentationObserver, pmtuBlackholeDetected(_, _))
EXPECT_CALL(*mockObserver, pmtuBlackholeDetected(_, _))
.Times(1)
.WillOnce(
Invoke([&](QuicSocket* /* qSocket */,
const InstrumentationObserver::PMTUBlackholeEvent& event) {
.WillOnce(Invoke([&](QuicSocket* /* qSocket */,
const Observer::PMTUBlackholeEvent& event) {
EXPECT_LE(d6d.meta.timeLastNonSearchState, event.blackholeTime);
EXPECT_EQ(D6DMachineState::BASE, event.lastNonSearchState);
EXPECT_EQ(D6DMachineState::SEARCHING, event.currentState);
@@ -432,9 +431,10 @@ TEST_F(QuicD6DStateFunctionsTest, BlackholeInSearchComplete) {
QuicConnectionStateBase conn(QuicNodeType::Server);
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::SEARCH_COMPLETE;
conn.udpSendPacketLen = d6d.basePMTU + 20;
d6d.currentProbeSize = d6d.basePMTU + 20;
@@ -463,11 +463,10 @@ TEST_F(QuicD6DStateFunctionsTest, BlackholeInSearchComplete) {
std::chrono::microseconds(kDefaultD6DBlackholeDetectionWindow).count(),
1); // Threshold of 1 will cause window to be set to 0
EXPECT_CALL(*mockInstrumentationObserver, pmtuBlackholeDetected(_, _))
EXPECT_CALL(*mockObserver, pmtuBlackholeDetected(_, _))
.Times(1)
.WillOnce(
Invoke([&](QuicSocket* /* qSocket */,
const InstrumentationObserver::PMTUBlackholeEvent& event) {
.WillOnce(Invoke([&](QuicSocket* /* qSocket */,
const Observer::PMTUBlackholeEvent& event) {
EXPECT_EQ(d6d.meta.timeLastNonSearchState, event.blackholeTime);
EXPECT_EQ(D6DMachineState::BASE, event.lastNonSearchState);
EXPECT_EQ(D6DMachineState::SEARCH_COMPLETE, event.currentState);
@@ -495,9 +494,10 @@ TEST_F(QuicD6DStateFunctionsTest, ReachMaxPMTU) {
QuicConnectionStateBase conn(QuicNodeType::Server);
auto& d6d = conn.d6d;
auto now = Clock::now();
auto mockInstrumentationObserver =
std::make_unique<StrictMock<MockInstrumentationObserver>>();
conn.instrumentationObservers_.push_back(mockInstrumentationObserver.get());
auto mockObserver = std::make_unique<StrictMock<MockObserver>>();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(mockObserver.get());
conn.observers = observers;
d6d.state = D6DMachineState::SEARCHING;
d6d.maxPMTU = 1452;
d6d.outstandingProbes = 1;

View File

@@ -208,7 +208,7 @@ folly::Optional<CongestionController::LossEvent> detectLossPackets(
<< " delayUntilLost=" << delayUntilLost.count() << "us"
<< " " << conn;
CongestionController::LossEvent lossEvent(lossTime);
InstrumentationObserver::ObserverLossEvent observerLossEvent(lossTime);
Observer::LossEvent observerLossEvent(lossTime);
// Note that time based loss detection is also within the same PNSpace.
auto iter = getFirstOutstandingPacket(conn, pnSpace);
bool shouldSetTimer = false;
@@ -297,7 +297,7 @@ folly::Optional<CongestionController::LossEvent> detectLossPackets(
// if there are observers, enqueue a function to call it
if (observerLossEvent.hasPackets()) {
for (const auto& observer : conn.instrumentationObservers_) {
for (const auto& observer : *(conn.observers)) {
conn.pendingCallbacks.emplace_back(
[observer, observerLossEvent](QuicSocket* qSocket) {
observer->packetLossDetected(qSocket, observerLossEvent);

View File

@@ -94,6 +94,7 @@ class QuicLossFunctionsTest : public TestWithParam<PacketNumberSpace> {
conn->serverConnectionId = *connIdAlgo_->encodeConnectionId(params);
// for canSetLossTimerForAppData()
conn->oneRttWriteCipher = createNoOpAead();
conn->observers = std::make_shared<ObserverVec>();
return conn;
}
@@ -133,8 +134,7 @@ class QuicLossFunctionsTest : public TestWithParam<PacketNumberSpace> {
std::unique_ptr<ConnectionIdAlgo> connIdAlgo_;
auto getLossPacketMatcher(bool lossByReorder, bool lossByTimeout) {
return MockInstrumentationObserver::getLossPacketMatcher(
lossByReorder, lossByTimeout);
return MockObserver::getLossPacketMatcher(lossByReorder, lossByTimeout);
}
};
@@ -1875,10 +1875,12 @@ TEST_F(QuicLossFunctionsTest, PersistentCongestion) {
}
TEST_F(QuicLossFunctionsTest, TestReorderLossObserverCallback) {
auto ib = MockInstrumentationObserver();
auto observers = std::make_shared<ObserverVec>();
auto ib = MockObserver();
auto conn = createConn();
// Register 1 instrumentation observer
conn->instrumentationObservers_.emplace_back(&ib);
// Register 1 life cycle observer
observers->emplace_back(&ib);
conn->observers = observers;
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
@@ -1918,7 +1920,7 @@ TEST_F(QuicLossFunctionsTest, TestReorderLossObserverCallback) {
packetLossDetected(
nullptr,
Field(
&InstrumentationObserver::ObserverLossEvent::lostPackets,
&Observer::LossEvent::lostPackets,
UnorderedElementsAre(
getLossPacketMatcher(true, false),
getLossPacketMatcher(true, false),
@@ -1931,10 +1933,12 @@ TEST_F(QuicLossFunctionsTest, TestReorderLossObserverCallback) {
}
TEST_F(QuicLossFunctionsTest, TestTimeoutLossObserverCallback) {
auto ib = MockInstrumentationObserver();
auto observers = std::make_shared<ObserverVec>();
auto ib = MockObserver();
auto conn = createConn();
// Register 1 instrumentation observer
conn->instrumentationObservers_.emplace_back(&ib);
// Register 1 life cycle observer
observers->emplace_back(&ib);
conn->observers = observers;
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
@@ -1971,7 +1975,7 @@ TEST_F(QuicLossFunctionsTest, TestTimeoutLossObserverCallback) {
packetLossDetected(
nullptr,
Field(
&InstrumentationObserver::ObserverLossEvent::lostPackets,
&Observer::LossEvent::lostPackets,
UnorderedElementsAre(
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true),
@@ -1988,10 +1992,12 @@ TEST_F(QuicLossFunctionsTest, TestTimeoutLossObserverCallback) {
}
TEST_F(QuicLossFunctionsTest, TestTimeoutAndReorderLossObserverCallback) {
auto ib = MockInstrumentationObserver();
auto observers = std::make_shared<ObserverVec>();
auto ib = MockObserver();
auto conn = createConn();
// Register 1 instrumentation observer
conn->instrumentationObservers_.emplace_back(&ib);
// Register 1 life cycle observer
observers->emplace_back(&ib);
conn->observers = observers;
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
@@ -2034,7 +2040,7 @@ TEST_F(QuicLossFunctionsTest, TestTimeoutAndReorderLossObserverCallback) {
packetLossDetected(
nullptr,
Field(
&InstrumentationObserver::ObserverLossEvent::lostPackets,
&Observer::LossEvent::lostPackets,
UnorderedElementsAre(
getLossPacketMatcher(true, true),
getLossPacketMatcher(true, true),
@@ -2047,7 +2053,7 @@ TEST_F(QuicLossFunctionsTest, TestTimeoutAndReorderLossObserverCallback) {
}
}
TEST_F(QuicLossFunctionsTest, TestNoInstrumentationObserverCallback) {
TEST_F(QuicLossFunctionsTest, TestNoObserverCallback) {
auto conn = createConn();
auto noopLossVisitor = [](auto&, auto&, bool) {};

View File

@@ -16,7 +16,7 @@ class QuicServerWorker;
/**
* Observer of events related to connection acceptance.
*
* This observer can be combined with QuicSocket::LifecycleObserver and other
* This observer can be combined with QuicSocket::Observer and other
* observers to enable instrumentation to be installed when a connection is
* accepted. For instance, a sampling algorithm can be executed in accept() to
* sample and install instrumentation on a subset of connections.

View File

@@ -33,6 +33,8 @@ QuicServerTransport::QuicServerTransport(
tempConn->serverAddr = socket_->address();
serverConn_ = tempConn.get();
conn_.reset(tempConn.release());
conn_->observers = observers_;
// TODO: generate this when we can encode the packet sequence number
// correctly.
// conn_->nextSequenceNum = folly::Random::secureRandom<PacketNum>();
@@ -560,7 +562,7 @@ void QuicServerTransport::maybeStartD6DProbing() {
// valuable
conn_->pendingEvents.d6d.sendProbeDelay = kDefaultD6DKickStartDelay;
QUIC_STATS(conn_->statsCallback, onConnectionD6DStarted);
for (const auto& cb : conn_->instrumentationObservers_) {
for (const auto& cb : *(conn_->observers)) {
cb->pmtuProbingStarted(this);
}
}

View File

@@ -4248,9 +4248,9 @@ TEST_P(
testSetupConnection();
}
TEST_P(QuicServerTransportHandshakeTest, TestD6DStartInstrumentationCallback) {
auto mockObserver = std::make_unique<MockInstrumentationObserver>();
server->addInstrumentationObserver(mockObserver.get());
TEST_P(QuicServerTransportHandshakeTest, TestD6DStartCallback) {
auto mockObserver = std::make_unique<MockObserver>();
server->addObserver(mockObserver.get());
// Set oneRttReader so that maybeStartD6DPriobing passes its check
auto codec = std::make_unique<QuicReadCodec>(QuicNodeType::Server);
codec->setOneRttReadCipher(createNoOpAead());
@@ -4260,7 +4260,7 @@ TEST_P(QuicServerTransportHandshakeTest, TestD6DStartInstrumentationCallback) {
EXPECT_CALL(*mockObserver, pmtuProbingStarted(_)).Times(1);
// CHLO should be enough to trigger probing
recvClientHello();
server->removeInstrumentationObserver(mockObserver.get());
server->removeObserver(mockObserver.get());
}
TEST_F(QuicServerTransportTest, TestRegisterAndHandleTransportKnobParams) {

View File

@@ -144,9 +144,9 @@ void processAckFrame(
auto rttSample = std::chrono::duration_cast<std::chrono::microseconds>(
ackReceiveTimeOrNow - rPacketIt->metadata.time);
if (!ack.implicit && currentPacketNum == frame.largestAcked) {
InstrumentationObserver::PacketRTT packetRTT(
Observer::PacketRTT packetRTT(
ackReceiveTimeOrNow, rttSample, frame.ackDelay, *rPacketIt);
for (const auto& observer : conn.instrumentationObservers_) {
for (const auto& observer : *(conn.observers)) {
conn.pendingCallbacks.emplace_back(
[observer, packetRTT](QuicSocket* qSocket) {
observer->rttSampleGenerated(qSocket, packetRTT);

View File

@@ -491,7 +491,9 @@ class PendingPathRateLimiter;
struct QuicConnectionStateBase : public folly::DelayedDestruction {
virtual ~QuicConnectionStateBase() = default;
explicit QuicConnectionStateBase(QuicNodeType type) : nodeType(type) {}
explicit QuicConnectionStateBase(QuicNodeType type) : nodeType(type) {
observers = std::make_shared<ObserverVec>();
}
// Accessor to output buffer for continuous memory GSO writes
BufAccessor* bufAccessor{nullptr};
@@ -874,12 +876,12 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
*/
bool retireAndSwitchPeerConnectionIds();
// instrumentation observers
InstrumentationObserverVec instrumentationObservers_;
// queue of functions to be called in processCallbacksAfterNetworkData
std::vector<std::function<void(QuicSocket*)>> pendingCallbacks;
// Vector of Observers that are attached to this socket.
std::shared_ptr<const ObserverVec> observers;
// Type of node owning this connection (client or server).
QuicNodeType nodeType;

View File

@@ -1120,9 +1120,11 @@ TEST_P(AckHandlersTest, TestRTTPacketObserverCallback) {
auto mockCongestionController = std::make_unique<MockCongestionController>();
conn.congestionController = std::move(mockCongestionController);
// Register 1 instrumentation observer
auto ib = MockInstrumentationObserver();
conn.instrumentationObservers_.emplace_back(&ib);
// Register 1 observer
auto ib = MockObserver();
auto observers = std::make_shared<ObserverVec>();
observers->emplace_back(&ib);
conn.observers = observers;
PacketNum packetNum = 0;
StreamId streamid = 0;
@@ -1176,7 +1178,7 @@ TEST_P(AckHandlersTest, TestRTTPacketObserverCallback) {
//
// Its important to check the if
// largestAcked - currentPacketNum > reorderingThreshold (currently 3)
// else it can trigger InstrumentationObserver::packetLossDetected
// else it can trigger Observer::packetLossDetected
// and increase the number of callbacks
ackVec.emplace_back(18, 18, 0ms); // +1
ackVec.emplace_back(16, 17, 2ms); // +1
@@ -1206,16 +1208,11 @@ TEST_P(AckHandlersTest, TestRTTPacketObserverCallback) {
rttSampleGenerated(
nullptr,
AllOf(
Field(&Observer::PacketRTT::rcvTime, ackData.ackTime),
Field(&Observer::PacketRTT::rttSample, rttSample),
Field(&Observer::PacketRTT::ackDelay, ackData.ackDelay),
Field(
&InstrumentationObserver::PacketRTT::rcvTime,
ackData.ackTime),
Field(
&InstrumentationObserver::PacketRTT::rttSample, rttSample),
Field(
&InstrumentationObserver::PacketRTT::ackDelay,
ackData.ackDelay),
Field(
&InstrumentationObserver::PacketRTT::metadata,
&Observer::PacketRTT::metadata,
Field(
&quic::OutstandingPacketMetadata::inflightBytes,
ackData.endSeq + 1)))));

View File

@@ -131,12 +131,8 @@ ProbeSizeRaiserType parseRaiserType(uint32_t type) {
}
}
class TPerfInstrumentationObserver : public InstrumentationObserver {
class TPerfObserver : public Observer {
public:
void observerDetach(QuicSocket* /* socket */) noexcept override {
// do nothing
}
void appRateLimited(QuicSocket* /* socket */) override {
if (FLAGS_log_app_rate_limited) {
LOG(INFO) << "appRateLimited detected";
@@ -145,7 +141,7 @@ class TPerfInstrumentationObserver : public InstrumentationObserver {
void packetLossDetected(
QuicSocket*, /* socket */
const struct ObserverLossEvent& /* lossEvent */) override {
const struct LossEvent& /* lossEvent */) override {
if (FLAGS_log_loss) {
LOG(INFO) << "packetLoss detected";
}
@@ -185,16 +181,15 @@ class TPerfInstrumentationObserver : public InstrumentationObserver {
};
/**
* A helper accpetor observer that installs instrumentation observers to
* A helper accpetor observer that installs life cycle observers to
* transport upon accpet
*/
class TPerfAcceptObserver : public AcceptObserver {
public:
TPerfAcceptObserver()
: tperfInstObserver_(std::make_unique<TPerfInstrumentationObserver>()) {}
TPerfAcceptObserver() : tperfObserver_(std::make_unique<TPerfObserver>()) {}
void accept(QuicTransportBase* transport) noexcept override {
transport->addInstrumentationObserver(tperfInstObserver_.get());
transport->addObserver(tperfObserver_.get());
}
void acceptorDestroy(QuicServerWorker* /* worker */) noexcept override {
@@ -210,7 +205,7 @@ class TPerfAcceptObserver : public AcceptObserver {
}
private:
std::unique_ptr<TPerfInstrumentationObserver> tperfInstObserver_;
std::unique_ptr<TPerfObserver> tperfObserver_;
};
} // namespace