diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 3accd6f73..f5297af9f 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -3416,4 +3416,46 @@ void QuicTransportBase::appendCmsgs(const folly::SocketOptionMap& options) { socket_->appendCmsgs(options); } +void QuicTransportBase::setBackgroundModeParameters( + PriorityLevel maxBackgroundPriority, + float backgroundUtilizationFactor) { + backgroundPriorityThreshold_.assign(maxBackgroundPriority); + backgroundUtilizationFactor_.assign(backgroundUtilizationFactor); + conn_->streamManager->setPriorityChangesObserver(this); + onStreamPrioritiesChange(); +} + +void QuicTransportBase::clearBackgroundModeParameters() { + backgroundPriorityThreshold_.clear(); + backgroundUtilizationFactor_.clear(); + conn_->streamManager->resetPriorityChangesObserver(); + onStreamPrioritiesChange(); +} + +// If backgroundPriorityThreshold_ and backgroundUtilizationFactor_ are set and +// all streams have equal or lower priority than the threshold +// (value >= threshold), set the connection's congestion controller to use +// background mode with the set utilization factor. +// In all other cases, turn off the congestion controller's background mode. +void QuicTransportBase::onStreamPrioritiesChange() { + if (conn_->congestionController == nullptr) { + return; + } + if (!backgroundPriorityThreshold_.hasValue() || + !backgroundUtilizationFactor_.hasValue()) { + conn_->congestionController->setBandwidthUtilizationFactor(1.0); + return; + } + bool allStreamsBackground = conn_->streamManager->getHighestPriorityLevel() >= + backgroundPriorityThreshold_.value(); + float targetUtilization = + allStreamsBackground ? backgroundUtilizationFactor_.value() : 1.0f; + VLOG(10) << fmt::format( + "Updating transport background mode. Highest Priority={} Threshold={} TargetUtilization={}", + conn_->streamManager->getHighestPriorityLevel(), + targetUtilization, + backgroundPriorityThreshold_.value()); + conn_->congestionController->setBandwidthUtilizationFactor(targetUtilization); +} + } // namespace quic diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index ca9b8ee63..ffc662f2e 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -35,7 +35,7 @@ enum class CloseState { OPEN, GRACEFUL_CLOSING, CLOSED }; * This is needed in order for QUIC to be able to live beyond the lifetime * of the object that holds it to send graceful close messages to the peer. */ -class QuicTransportBase : public QuicSocket { +class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver { public: QuicTransportBase( folly::EventBase* evb, @@ -419,6 +419,22 @@ class QuicTransportBase : public QuicSocket { const ByteEvent::Type type, const StreamId id) const override; + /* + * Set the background mode priority threshold and the target bw utilization + * factor to use when in background mode. + * + * If all streams have equal or lower priority compares to the threshold + * (value >= threshold), the connection is considered to be in backround mode. + */ + void setBackgroundModeParameters( + PriorityLevel maxBackgroundPriority, + float backgroundUtilizationFactor); + + /* + * Disable background mode by clearing all related parameters. + */ + void clearBackgroundModeParameters(); + // Timeout functions class LossTimeout : public folly::HHWheelTimer::Callback { public: @@ -694,6 +710,13 @@ class QuicTransportBase : public QuicSocket { void handleStreamStopSendingCallbacks(); void handleConnWritable(); + /* + * Observe changes in stream priorities and handle background mode. + * + * Implements the QuicStreamPrioritiesObserver interface + */ + void onStreamPrioritiesChange() override; + void runOnEvbAsync( folly::Function)> func); @@ -1000,6 +1023,12 @@ class QuicTransportBase : public QuicSocket { // callback object or two new split callback objects. Will be removed out once // mvfst is switched to the new split callbacks eventually. bool useSplitConnectionCallbacks_{false}; + + // Priority level threshold for background streams + // If all streams have equal or lower priority to the threshold + // (value >= threshold), the connection is considered to be in backround mode. + folly::Optional backgroundPriorityThreshold_; + folly::Optional backgroundUtilizationFactor_; }; std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt); diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index fd121773c..328f1b31c 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -3870,5 +3871,82 @@ TEST_F(QuicTransportImplTest, Cmsgs) { transport->appendCmsgs(cmsgs); } +TEST_F(QuicTransportImplTest, BackgroundModeChangeWithStreamChanges) { + // Verify that background mode is correctly turned on and off + // based upon stream creation, priority changes, stream removal. + // For different steps try local (uni/bi)directional streams and remote + // streams + InSequence s; + auto& conn = transport->getConnectionState(); + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + conn.congestionController = std::move(mockCongestionController); + auto& manager = *conn.streamManager; + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_)) + .Times(0); // Backgound params not set + auto stream = manager.createNextUnidirectionalStream().value(); + manager.setStreamPriority(stream->id, 1, false); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5)) + .Times(1); // On setting the background params + transport->setBackgroundModeParameters(1, 0.5); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5)) + .Times(1); // On removing a closed stream + stream->sendState = StreamSendState::Closed; + stream->recvState = StreamRecvState::Closed; + manager.removeClosedStream(stream->id); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5)) + .Times(2); // On stream creation - create two streams - one bidirectional + auto stream2Id = manager.createNextUnidirectionalStream().value()->id; + auto stream3id = manager.createNextBidirectionalStream().value()->id; + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0)) + .Times(1); // On increasing the priority of one of the streams + manager.setStreamPriority(stream3id, 0, false); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0)) + .Times(1); // a new lower priority stream does not affect the utlization + // factor + auto streamLower = manager.createNextBidirectionalStream().value(); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0)) + .Times(1); // On removing a closed stream + streamLower->sendState = StreamSendState::Closed; + streamLower->recvState = StreamRecvState::Closed; + manager.removeClosedStream(streamLower->id); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5)) + .Times(1); // On removing a closed stream + CHECK_NOTNULL(manager.getStream(stream3id))->sendState = + StreamSendState::Closed; + CHECK_NOTNULL(manager.getStream(stream3id))->recvState = + StreamRecvState::Closed; + manager.removeClosedStream(stream3id); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5)) + .Times(1); // On stream creation - remote stream + auto peerStreamId = 20; + ASSERT_TRUE(isRemoteStream(conn.nodeType, peerStreamId)); + auto stream4 = manager.getStream(peerStreamId); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0)) + .Times(1); // On clearing the background parameters + transport->clearBackgroundModeParameters(); + + EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_)) + .Times(0); // Background params not set + stream4->sendState = StreamSendState::Closed; + stream4->recvState = StreamRecvState::Closed; + manager.removeClosedStream(stream4->id); + CHECK_NOTNULL(manager.getStream(stream2Id))->sendState = + StreamSendState::Closed; + CHECK_NOTNULL(manager.getStream(stream2Id))->recvState = + StreamRecvState::Closed; + manager.removeClosedStream(stream2Id); +} + } // namespace test } // namespace quic diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index f727eaa19..9df838ea3 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -236,6 +236,15 @@ bool QuicStreamManager::setStreamPriority( return false; } stream->priority = newPriority; + auto priorityMapEntry = streamPriorityLevels_.find(id); + if (priorityMapEntry == streamPriorityLevels_.end()) { + throw QuicTransportException( + "Active stream not in stream priority map", + TransportErrorCode::STREAM_STATE_ERROR); + } else { + priorityMapEntry->second = newPriority.level; + } + notifyStreamPriorityChanges(); // If this stream is already in the writable or loss queus, update the // priority there. writableStreams_.updateIfExist(id, stream->priority); @@ -273,6 +282,7 @@ QuicStreamManager::getOrCreateOpenedLocalStream(StreamId streamId) { throw QuicTransportException( "Creating an active stream", TransportErrorCode::STREAM_STATE_ERROR); } + addToStreamPriorityMap(it.first->second); return &it.first->second; } return nullptr; @@ -358,6 +368,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) { std::piecewise_construct, std::forward_as_tuple(streamId), std::forward_as_tuple(streamId, conn_)); + addToStreamPriorityMap(it.first->second); QUIC_STATS(conn_.statsCallback, onNewQuicStream); return &it.first->second; } @@ -386,6 +397,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) { std::piecewise_construct, std::forward_as_tuple(streamId), std::forward_as_tuple(streamId, conn_)); + addToStreamPriorityMap(it.first->second); QUIC_STATS(conn_.statsCallback, onNewQuicStream); return &it.first->second; } @@ -423,6 +435,7 @@ QuicStreamManager::createStream(StreamId streamId) { std::piecewise_construct, std::forward_as_tuple(streamId), std::forward_as_tuple(streamId, conn_)); + addToStreamPriorityMap(it.first->second); QUIC_STATS(conn_.statsCallback, onNewQuicStream); updateAppIdleState(); return &it.first->second; @@ -448,6 +461,15 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { windowUpdates_.erase(streamId); stopSendingStreams_.erase(streamId); flowControlUpdated_.erase(streamId); + { + const auto streamPriorityIt = streamPriorityLevels_.find(streamId); + if (streamPriorityIt == streamPriorityLevels_.end()) { + throw QuicTransportException( + "Removed stream is not in the priority map", + TransportErrorCode::STREAM_STATE_ERROR); + } + streamPriorityLevels_.erase(streamPriorityIt); + } if (it->second.isControl) { DCHECK_GT(numControlStreams_, 0); numControlStreams_--; @@ -495,6 +517,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { openLocalStreams.erase(streamId); } updateAppIdleState(); + notifyStreamPriorityChanges(); } void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { @@ -567,4 +590,59 @@ bool QuicStreamManager::isAppIdle() const { return isAppIdle_; } +PriorityLevel QuicStreamManager::getHighestPriorityLevel() const { + // Highest priority is minimum value + auto min = kDefaultMaxPriority; + for (auto& entry : streamPriorityLevels_) { + if (entry.second < min) { + min = entry.second; + } + if (min == 0) { + break; + } + } + return min; +} + +void QuicStreamManager::setPriorityChangesObserver( + QuicStreamPrioritiesObserver* observer) { + priorityChangesObserver_ = observer; +} + +void QuicStreamManager::resetPriorityChangesObserver() { + if (!priorityChangesObserver_) { + return; + } + priorityChangesObserver_ = nullptr; +} + +void QuicStreamManager::notifyStreamPriorityChanges() { + if (priorityChangesObserver_) { + priorityChangesObserver_->onStreamPrioritiesChange(); + } +} + +void QuicStreamManager::addToStreamPriorityMap( + const QuicStreamState& streamState) { + auto entry = streamPriorityLevels_.emplace( + streamState.id, PriorityLevel(streamState.priority.level)); + + // Verify stream didn't already exist in streamPriorityLevels_ + if (!entry.second) { + throw QuicTransportException( + "Attempted to add stream already in priority map", + TransportErrorCode::STREAM_STATE_ERROR); + } + + // Verify inserted item + if (entry.first->second != PriorityLevel(streamState.priority.level)) { + throw QuicTransportException( + "Failed to add stream to priority map", + TransportErrorCode::STREAM_STATE_ERROR); + } + + // Notify observer (if set) + notifyStreamPriorityChanges(); +} + } // namespace quic diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index decc202b8..411382e81 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -110,6 +111,7 @@ class QuicStreamManager { newPeerStreams_ = std::move(other.newPeerStreams_); blockedStreams_ = std::move(other.blockedStreams_); stopSendingStreams_ = std::move(other.stopSendingStreams_); + streamPriorityLevels_ = std::move(other.streamPriorityLevels_); windowUpdates_ = std::move(other.windowUpdates_); flowControlUpdated_ = std::move(other.flowControlUpdated_); lossStreams_ = std::move(other.lossStreams_); @@ -837,6 +839,23 @@ class QuicStreamManager { bool isAppIdle() const; + /* + * Sets an observer that will be notified whenever the set of stream + * priorities changes + */ + void setPriorityChangesObserver(QuicStreamPrioritiesObserver* observer); + + /* + * Stops notifications for changes to the set of stream priorities + */ + void resetPriorityChangesObserver(); + + /* + * Returns the highest priority level used by any stream + * (Highest priority is lowest value) + */ + [[nodiscard]] PriorityLevel getHighestPriorityLevel() const; + private: // Updates the congestion controller app-idle state, after a change in the // number of streams. @@ -858,6 +877,9 @@ class QuicStreamManager { uint64_t maxStreams, bool force); + void addToStreamPriorityMap(const QuicStreamState& streamState); + void notifyStreamPriorityChanges(); + QuicConnectionStateBase& conn_; QuicNodeType nodeType_; @@ -937,6 +959,9 @@ class QuicStreamManager { // Map of streams where the peer was asked to stop sending folly::F14FastMap stopSendingStreams_; + // Map of stream priority levels + folly::F14FastMap streamPriorityLevels_; + // Streams that had their stream window change and potentially need a window // update sent folly::F14FastSet windowUpdates_; @@ -969,6 +994,9 @@ class QuicStreamManager { // Streams that are closed but we still have state for folly::F14FastSet closedStreams_; + // Observer to notify on changes in the streamPriorityLevels_ map + QuicStreamPrioritiesObserver* priorityChangesObserver_{nullptr}; + // Record whether or not we are app-idle. bool isAppIdle_{false}; diff --git a/quic/state/QuicStreamPrioritiesObserver.h b/quic/state/QuicStreamPrioritiesObserver.h new file mode 100644 index 000000000..d671e75d7 --- /dev/null +++ b/quic/state/QuicStreamPrioritiesObserver.h @@ -0,0 +1,20 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + */ + +#pragma once + +namespace quic { + +class QuicStreamPrioritiesObserver { + public: + virtual ~QuicStreamPrioritiesObserver() = default; + + virtual void onStreamPrioritiesChange() = 0; +}; + +} // namespace quic diff --git a/quic/state/test/Mocks.h b/quic/state/test/Mocks.h index e144464ca..c29148610 100644 --- a/quic/state/test/Mocks.h +++ b/quic/state/test/Mocks.h @@ -64,5 +64,10 @@ class MockPendingPathRateLimiter : public PendingPathRateLimiter { currentCredit, uint64_t(TimePoint, std::chrono::microseconds)); }; + +class MockQuicStreamPrioritiesObserver : public QuicStreamPrioritiesObserver { + public: + MOCK_METHOD0(onStreamPrioritiesChange, void()); +}; } // namespace test } // namespace quic diff --git a/quic/state/test/QuicStreamManagerTest.cpp b/quic/state/test/QuicStreamManagerTest.cpp index 7c29d3e4b..0446dc96b 100644 --- a/quic/state/test/QuicStreamManagerTest.cpp +++ b/quic/state/test/QuicStreamManagerTest.cpp @@ -11,7 +11,9 @@ #include #include +#include #include +#include #include using namespace testing; @@ -338,5 +340,71 @@ TEST_F(QuicStreamManagerTest, WriteBufferMeta) { EXPECT_TRUE(manager.writableDSRStreams().empty()); } +TEST_F(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) { + // Verify that the StreamPriorityChanges callback function is called + // upon stream creation, priority changes, stream removal. + // For different steps try local (uni/bi)directional streams and remote + // streams + + MockQuicStreamPrioritiesObserver mObserver; + + auto& manager = *conn.streamManager; + manager.setPriorityChangesObserver(&mObserver); + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(2); // On stream creation and on setting the priority + auto stream = manager.createNextUnidirectionalStream().value(); + EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level); + + manager.setStreamPriority(stream->id, 1, false); + EXPECT_EQ(manager.getHighestPriorityLevel(), 1); + + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(1); // On removing a closed stream + stream->sendState = StreamSendState::Closed; + stream->recvState = StreamRecvState::Closed; + manager.removeClosedStream(stream->id); + // No active stream. Highest priority should return the max value (least + // priority). + EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultMaxPriority); + + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(2); // On stream creation - create two streams - one bidirectional + auto stream2Id = manager.createNextUnidirectionalStream().value()->id; + auto stream3 = manager.createNextBidirectionalStream().value(); + EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level); + + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(1); // On increasing the priority of one of the streams + manager.setStreamPriority(stream3->id, 0, false); + EXPECT_EQ(manager.getHighestPriorityLevel(), 0); + + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(1); // On removing a closed stream; + stream3->sendState = StreamSendState::Closed; + stream3->recvState = StreamRecvState::Closed; + manager.removeClosedStream(stream3->id); + EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level); + + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(1); // On stream creation - remote stream + auto peerStreamId = 20; + ASSERT_TRUE(isRemoteStream(conn.nodeType, peerStreamId)); + auto stream4 = manager.getStream(peerStreamId); + EXPECT_NE(stream4, nullptr); + EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level); + + EXPECT_CALL(mObserver, onStreamPrioritiesChange()) + .Times(0); // Removing streams but observer removed + manager.resetPriorityChangesObserver(); + stream4->sendState = StreamSendState::Closed; + stream4->recvState = StreamRecvState::Closed; + manager.removeClosedStream(stream4->id); + CHECK_NOTNULL(manager.getStream(stream2Id))->sendState = + StreamSendState::Closed; + CHECK_NOTNULL(manager.getStream(stream2Id))->recvState = + StreamRecvState::Closed; + manager.removeClosedStream(stream2Id); +} + } // namespace test } // namespace quic