diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index ff3d933d4..b70728a59 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -952,7 +952,7 @@ class QuicSocket { virtual void observerAttach(QuicSocket* /* socket */) noexcept = 0; /** - * observerDetached() will be invoked if the observer is uninstalled prior + * observerDetach() will be invoked if the observer is uninstalled prior * to socket destruction. * * No further callbacks will be invoked after observerDetach(). @@ -1020,5 +1020,64 @@ class QuicSocket { */ FOLLY_NODISCARD virtual const LifecycleObserverVec& getLifecycleObservers() const = 0; + + /** + * ===== Instrumentation Observer API ===== + */ + + /** + * Observer of socket instrumentation events. + */ + class InstrumentationObserver { + public: + virtual ~InstrumentationObserver() = default; + + /** + * observerDetach() will be invoked when the observer is uninstalled. + * + * No further callbacks will be invoked after observerDetach(). + * + * @param socket Socket where observer was uninstalled. + */ + virtual void observerDetach(QuicSocket* /* socket */) noexcept = 0; + + /** + * appRateLimited() is invoked when the socket is app rate limited. + * + * @param socket Socket that has become application rate limited. + */ + virtual void appRateLimited(QuicSocket* /* socket */) {} + }; + + // Container for instrumentation observers. + // Avoids heap allocation for up to 2 observers being installed. + using InstrumentationObserverVec = SmallVec; + + /** + * Adds a instrumentation observer. + * + * Instrumentation observers get notified of various socket events. + * + * @param observer Observer to add (implements InstrumentationObserver). + */ + virtual void addInstrumentationObserver( + InstrumentationObserver* observer) = 0; + + /** + * Removes a instrumentation observer. + * + * @param observer Observer to remove. + * @return Whether observer found and removed from list. + */ + virtual bool removeInstrumentationObserver( + InstrumentationObserver* observer) = 0; + + /** + * Returns installed instrumentation observers. + * + * @return Reference to const vector with installed observers. + */ + FOLLY_NODISCARD virtual const InstrumentationObserverVec& + getInstrumentationObservers() const = 0; }; } // namespace quic diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index eefd54aed..e3d4b1a2a 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -225,6 +225,11 @@ void QuicTransportBase::closeImpl( for (const auto& cb : lifecycleObservers_) { cb->close(this, errorCode); } + for (const auto& cb : instrumentationObservers_) { + cb->observerDetach(this); + } + instrumentationObservers_.clear(); + if (closeState_ == CloseState::CLOSED) { return; } @@ -2364,7 +2369,7 @@ void QuicTransportBase::cancelAllAppCallbacks( } void QuicTransportBase::addLifecycleObserver(LifecycleObserver* observer) { - lifecycleObservers_.push_back(observer); + lifecycleObservers_.push_back(CHECK_NOTNULL(observer)); observer->observerAttach(this); } @@ -2387,6 +2392,33 @@ QuicTransportBase::getLifecycleObservers() const { return lifecycleObservers_; } +void QuicTransportBase::addInstrumentationObserver( + InstrumentationObserver* observer) { + instrumentationObservers_.push_back(CHECK_NOTNULL(observer)); +} + +bool QuicTransportBase::removeInstrumentationObserver( + InstrumentationObserver* observer) { + const auto eraseIt = std::remove( + instrumentationObservers_.begin(), + instrumentationObservers_.end(), + observer); + if (eraseIt == instrumentationObservers_.end()) { + return false; + } + + for (auto it = eraseIt; it != instrumentationObservers_.end(); it++) { + (*it)->observerDetach(this); + } + instrumentationObservers_.erase(eraseIt, instrumentationObservers_.end()); + return true; +} + +const QuicTransportBase::InstrumentationObserverVec& +QuicTransportBase::getInstrumentationObservers() const { + return instrumentationObservers_; +} + void QuicTransportBase::writeSocketData() { if (socket_) { auto packetsBefore = conn_->outstandings.packets.size(); @@ -2433,7 +2465,11 @@ void QuicTransportBase::writeSocketData() { currentSendBufLen < conn_->udpSendPacketLen && lossBufferEmpty && conn_->congestionController->getWritableBytes()) { conn_->congestionController->setAppLimited(); + // notify via connection call and any instrumentation callbacks connCallback_->onAppRateLimited(); + for (const auto& cb : instrumentationObservers_) { + cb->appRateLimited(this); + } } } } diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 35246734f..90d7d7765 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -510,6 +510,32 @@ class QuicTransportBase : public QuicSocket { FOLLY_NODISCARD const LifecycleObserverVec& getLifecycleObservers() const override; + /** + * Adds a instrumentation observer. + * + * Instrumentation observers get notified of various socket events. + * + * @param observer Observer to add (implements InstrumentationObserver). + */ + void addInstrumentationObserver(InstrumentationObserver* observer) override; + + /** + * Removes a instrumentation observer. + * + * @param observer Observer to remove. + * @return Whether observer found and removed from list. + */ + bool removeInstrumentationObserver( + InstrumentationObserver* observer) override; + + /** + * Returns installed instrumentation observers. + * + * @return Reference to const vector with installed observers. + */ + FOLLY_NODISCARD const InstrumentationObserverVec& + getInstrumentationObservers() const override; + protected: void processCallbacksAfterNetworkData(); void invokeReadDataAndCallbacks(); @@ -662,6 +688,9 @@ class QuicTransportBase : public QuicSocket { // Lifecycle observers LifecycleObserverVec lifecycleObservers_; + + // Instrumentation observers + InstrumentationObserverVec instrumentationObservers_; }; std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt); diff --git a/quic/api/test/MockQuicSocket.h b/quic/api/test/MockQuicSocket.h index a650efec5..1a032c3ad 100644 --- a/quic/api/test/MockQuicSocket.h +++ b/quic/api/test/MockQuicSocket.h @@ -240,5 +240,11 @@ class MockQuicSocket : public QuicSocket { MOCK_METHOD1(addLifecycleObserver, void(LifecycleObserver*)); MOCK_METHOD1(removeLifecycleObserver, bool(LifecycleObserver*)); MOCK_CONST_METHOD0(getLifecycleObservers, const LifecycleObserverVec&()); + + MOCK_METHOD1(addInstrumentationObserver, void(InstrumentationObserver*)); + MOCK_METHOD1(removeInstrumentationObserver, bool(InstrumentationObserver*)); + MOCK_CONST_METHOD0( + getInstrumentationObservers, + const InstrumentationObserverVec&()); }; } // namespace quic diff --git a/quic/api/test/Mocks.h b/quic/api/test/Mocks.h index aeb455423..9ff06f7a1 100644 --- a/quic/api/test/Mocks.h +++ b/quic/api/test/Mocks.h @@ -297,6 +297,12 @@ class MockLifecycleObserver : public QuicSocket::LifecycleObserver { const folly::Optional>&)); }; +class MockInstrumentationObserver : public QuicSocket::InstrumentationObserver { + public: + GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*)); + GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*)); +}; + inline std::ostream& operator<<(std::ostream& os, const MockQuicTransport&) { return os; } diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 6382ae7f2..89a44075d 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -2968,5 +2968,84 @@ TEST_F(QuicTransportImplTest, LifecycleObserverMultipleAttachDestroyTransport) { Mock::VerifyAndClearExpectations(cb2.get()); } +TEST_F(QuicTransportImplTest, InstrumentationObserverAttachRemove) { + auto cb = std::make_unique>(); + transport->addInstrumentationObserver(cb.get()); + EXPECT_THAT( + transport->getInstrumentationObservers(), UnorderedElementsAre(cb.get())); + EXPECT_CALL(*cb, observerDetach(transport.get())); + EXPECT_TRUE(transport->removeInstrumentationObserver(cb.get())); + Mock::VerifyAndClearExpectations(cb.get()); + EXPECT_THAT(transport->getInstrumentationObservers(), IsEmpty()); +} + +TEST_F(QuicTransportImplTest, InstrumentationObserverRemoveMissing) { + auto cb = std::make_unique>(); + EXPECT_FALSE(transport->removeInstrumentationObserver(cb.get())); + EXPECT_THAT(transport->getInstrumentationObservers(), IsEmpty()); +} + +TEST_F(QuicTransportImplTest, InstrumentationObserverAttachDestroyTransport) { + auto cb = std::make_unique>(); + transport->addInstrumentationObserver(cb.get()); + EXPECT_THAT( + transport->getInstrumentationObservers(), UnorderedElementsAre(cb.get())); + EXPECT_CALL(*cb, observerDetach(transport.get())); + transport = nullptr; + Mock::VerifyAndClearExpectations(cb.get()); +} + +TEST_F(QuicTransportImplTest, InstrumentationObserverMultipleAttachRemove) { + auto cb1 = std::make_unique>(); + transport->addInstrumentationObserver(cb1.get()); + EXPECT_THAT( + transport->getInstrumentationObservers(), + UnorderedElementsAre(cb1.get())); + + auto cb2 = std::make_unique>(); + transport->addInstrumentationObserver(cb2.get()); + EXPECT_THAT( + transport->getInstrumentationObservers(), + UnorderedElementsAre(cb1.get(), cb2.get())); + + EXPECT_CALL(*cb2, observerDetach(transport.get())); + EXPECT_TRUE(transport->removeInstrumentationObserver(cb2.get())); + EXPECT_THAT( + transport->getInstrumentationObservers(), + UnorderedElementsAre(cb1.get())); + Mock::VerifyAndClearExpectations(cb1.get()); + Mock::VerifyAndClearExpectations(cb2.get()); + + EXPECT_CALL(*cb1, observerDetach(transport.get())); + EXPECT_TRUE(transport->removeInstrumentationObserver(cb1.get())); + EXPECT_THAT(transport->getInstrumentationObservers(), IsEmpty()); + Mock::VerifyAndClearExpectations(cb1.get()); + Mock::VerifyAndClearExpectations(cb2.get()); + + transport = nullptr; +} + +TEST_F( + QuicTransportImplTest, + InstrumentationObserverMultipleAttachDestroyTransport) { + auto cb1 = std::make_unique>(); + transport->addInstrumentationObserver(cb1.get()); + EXPECT_THAT( + transport->getInstrumentationObservers(), + UnorderedElementsAre(cb1.get())); + + auto cb2 = std::make_unique>(); + transport->addInstrumentationObserver(cb2.get()); + EXPECT_THAT( + transport->getInstrumentationObservers(), + UnorderedElementsAre(cb1.get(), cb2.get())); + + EXPECT_CALL(*cb1, observerDetach(transport.get())); + EXPECT_CALL(*cb2, observerDetach(transport.get())); + transport = nullptr; + Mock::VerifyAndClearExpectations(cb1.get()); + Mock::VerifyAndClearExpectations(cb2.get()); +} + } // namespace test } // namespace quic diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 383f2b909..88a252cf7 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -355,6 +355,102 @@ TEST_F(QuicTransportTest, AppLimited) { transport_->close(folly::none); } +TEST_F( + QuicTransportTest, + NotAppLimitedWithNoWritableBytesWithInstrumentationObservers) { + auto& conn = transport_->getConnectionState(); + // Replace with MockConnectionCallback: + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + conn.congestionController = std::move(mockCongestionController); + EXPECT_CALL(*rawCongestionController, getWritableBytes()) + .WillRepeatedly(Invoke([&]() { + if (conn.outstandings.packets.empty()) { + return 5000; + } + return 0; + })); + + auto cb = std::make_unique>(); + transport_->addInstrumentationObserver(cb.get()); + + auto stream = transport_->createBidirectionalStream().value(); + transport_->writeChain( + stream, + IOBuf::copyBuffer("An elephant sitting still"), + false, + false, + nullptr); + EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); + loopForWrites(); + Mock::VerifyAndClearExpectations(cb.get()); + EXPECT_CALL(*cb, observerDetach(transport_.get())); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(cb.get()); +} + +TEST_F( + QuicTransportTest, + NotAppLimitedWithLargeBufferWithInstrumentationObservers) { + auto& conn = transport_->getConnectionState(); + // Replace with MockConnectionCallback: + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + conn.congestionController = std::move(mockCongestionController); + EXPECT_CALL(*rawCongestionController, getWritableBytes()) + .WillRepeatedly(Return(5000)); + + auto cb = std::make_unique>(); + transport_->addInstrumentationObserver(cb.get()); + + auto stream = transport_->createBidirectionalStream().value(); + auto buf = buildRandomInputData(100 * 2000); + transport_->writeChain(stream, buf->clone(), false, false, nullptr); + EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); + loopForWrites(); + Mock::VerifyAndClearExpectations(cb.get()); + EXPECT_CALL(*cb, observerDetach(transport_.get())); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(cb.get()); +} + +TEST_F(QuicTransportTest, AppLimitedWithInstrumentationObservers) { + auto cb1 = std::make_unique>(); + auto cb2 = std::make_unique>(); + transport_->addInstrumentationObserver(cb1.get()); + transport_->addInstrumentationObserver(cb2.get()); + + auto& conn = transport_->getConnectionState(); + // Replace with MockConnectionCallback: + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + conn.congestionController = std::move(mockCongestionController); + EXPECT_CALL(*rawCongestionController, getWritableBytes()) + .WillRepeatedly(Return(5000)); + + auto stream = transport_->createBidirectionalStream().value(); + transport_->writeChain( + stream, + IOBuf::copyBuffer("An elephant sitting still"), + false, + false, + nullptr); + EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1); + EXPECT_CALL(*cb1, appRateLimited(transport_.get())); + EXPECT_CALL(*cb2, appRateLimited(transport_.get())); + loopForWrites(); + Mock::VerifyAndClearExpectations(cb1.get()); + Mock::VerifyAndClearExpectations(cb2.get()); + EXPECT_CALL(*cb1, observerDetach(transport_.get())); + EXPECT_CALL(*cb2, observerDetach(transport_.get())); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(cb1.get()); + Mock::VerifyAndClearExpectations(cb2.get()); +} + TEST_F(QuicTransportTest, WriteSmall) { // Testing writing a small buffer that could be fit in a single packet auto stream = transport_->createBidirectionalStream().value();