1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Control a connections bandwidth utilization factor in QuicTransportBase (background mode)

Summary:
The StreamManager monitors can now have a reference to a new observer interface QuicStreamPrioritiesObserver. If the reference is set, the observer is notified whenever streams are created, removed, or change priorities

QuicTransportBase implements this observer interface. It uses the event notifications to control a connection's background mode based upon the new background mode parameters: priority level threshold, and target utilization factor. When these parameters are set, and all active streams have a priority lower than the threshold, the connection's congestion controller is set to use only the defined utilization factor of the available bandwidth.

Reviewed By: bschlinker

Differential Revision: D31562505

fbshipit-source-id: 9c74fa834301f745d97e851741b911795f756023
This commit is contained in:
Joseph Beshay
2021-10-22 14:51:31 -07:00
committed by Facebook GitHub Bot
parent 4609c8741b
commit ea2af0fe10
8 changed files with 349 additions and 1 deletions

View File

@@ -3416,4 +3416,46 @@ void QuicTransportBase::appendCmsgs(const folly::SocketOptionMap& options) {
socket_->appendCmsgs(options);
}
void QuicTransportBase::setBackgroundModeParameters(
PriorityLevel maxBackgroundPriority,
float backgroundUtilizationFactor) {
backgroundPriorityThreshold_.assign(maxBackgroundPriority);
backgroundUtilizationFactor_.assign(backgroundUtilizationFactor);
conn_->streamManager->setPriorityChangesObserver(this);
onStreamPrioritiesChange();
}
void QuicTransportBase::clearBackgroundModeParameters() {
backgroundPriorityThreshold_.clear();
backgroundUtilizationFactor_.clear();
conn_->streamManager->resetPriorityChangesObserver();
onStreamPrioritiesChange();
}
// If backgroundPriorityThreshold_ and backgroundUtilizationFactor_ are set and
// all streams have equal or lower priority than the threshold
// (value >= threshold), set the connection's congestion controller to use
// background mode with the set utilization factor.
// In all other cases, turn off the congestion controller's background mode.
void QuicTransportBase::onStreamPrioritiesChange() {
if (conn_->congestionController == nullptr) {
return;
}
if (!backgroundPriorityThreshold_.hasValue() ||
!backgroundUtilizationFactor_.hasValue()) {
conn_->congestionController->setBandwidthUtilizationFactor(1.0);
return;
}
bool allStreamsBackground = conn_->streamManager->getHighestPriorityLevel() >=
backgroundPriorityThreshold_.value();
float targetUtilization =
allStreamsBackground ? backgroundUtilizationFactor_.value() : 1.0f;
VLOG(10) << fmt::format(
"Updating transport background mode. Highest Priority={} Threshold={} TargetUtilization={}",
conn_->streamManager->getHighestPriorityLevel(),
targetUtilization,
backgroundPriorityThreshold_.value());
conn_->congestionController->setBandwidthUtilizationFactor(targetUtilization);
}
} // namespace quic

View File

@@ -35,7 +35,7 @@ enum class CloseState { OPEN, GRACEFUL_CLOSING, CLOSED };
* This is needed in order for QUIC to be able to live beyond the lifetime
* of the object that holds it to send graceful close messages to the peer.
*/
class QuicTransportBase : public QuicSocket {
class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver {
public:
QuicTransportBase(
folly::EventBase* evb,
@@ -419,6 +419,22 @@ class QuicTransportBase : public QuicSocket {
const ByteEvent::Type type,
const StreamId id) const override;
/*
* Set the background mode priority threshold and the target bw utilization
* factor to use when in background mode.
*
* If all streams have equal or lower priority compares to the threshold
* (value >= threshold), the connection is considered to be in backround mode.
*/
void setBackgroundModeParameters(
PriorityLevel maxBackgroundPriority,
float backgroundUtilizationFactor);
/*
* Disable background mode by clearing all related parameters.
*/
void clearBackgroundModeParameters();
// Timeout functions
class LossTimeout : public folly::HHWheelTimer::Callback {
public:
@@ -694,6 +710,13 @@ class QuicTransportBase : public QuicSocket {
void handleStreamStopSendingCallbacks();
void handleConnWritable();
/*
* Observe changes in stream priorities and handle background mode.
*
* Implements the QuicStreamPrioritiesObserver interface
*/
void onStreamPrioritiesChange() override;
void runOnEvbAsync(
folly::Function<void(std::shared_ptr<QuicTransportBase>)> func);
@@ -1000,6 +1023,12 @@ class QuicTransportBase : public QuicSocket {
// callback object or two new split callback objects. Will be removed out once
// mvfst is switched to the new split callbacks eventually.
bool useSplitConnectionCallbacks_{false};
// Priority level threshold for background streams
// If all streams have equal or lower priority to the threshold
// (value >= threshold), the connection is considered to be in backround mode.
folly::Optional<PriorityLevel> backgroundPriorityThreshold_;
folly::Optional<float> backgroundUtilizationFactor_;
};
std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt);

View File

@@ -19,6 +19,7 @@
#include <quic/server/state/ServerStateMachine.h>
#include <quic/state/DatagramHandlers.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/test/Mocks.h>
#include <folly/io/async/test/MockAsyncUDPSocket.h>
@@ -3870,5 +3871,82 @@ TEST_F(QuicTransportImplTest, Cmsgs) {
transport->appendCmsgs(cmsgs);
}
TEST_F(QuicTransportImplTest, BackgroundModeChangeWithStreamChanges) {
// Verify that background mode is correctly turned on and off
// based upon stream creation, priority changes, stream removal.
// For different steps try local (uni/bi)directional streams and remote
// streams
InSequence s;
auto& conn = transport->getConnectionState();
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn.congestionController = std::move(mockCongestionController);
auto& manager = *conn.streamManager;
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_))
.Times(0); // Backgound params not set
auto stream = manager.createNextUnidirectionalStream().value();
manager.setStreamPriority(stream->id, 1, false);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
.Times(1); // On setting the background params
transport->setBackgroundModeParameters(1, 0.5);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
.Times(1); // On removing a closed stream
stream->sendState = StreamSendState::Closed;
stream->recvState = StreamRecvState::Closed;
manager.removeClosedStream(stream->id);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
.Times(2); // On stream creation - create two streams - one bidirectional
auto stream2Id = manager.createNextUnidirectionalStream().value()->id;
auto stream3id = manager.createNextBidirectionalStream().value()->id;
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
.Times(1); // On increasing the priority of one of the streams
manager.setStreamPriority(stream3id, 0, false);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
.Times(1); // a new lower priority stream does not affect the utlization
// factor
auto streamLower = manager.createNextBidirectionalStream().value();
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
.Times(1); // On removing a closed stream
streamLower->sendState = StreamSendState::Closed;
streamLower->recvState = StreamRecvState::Closed;
manager.removeClosedStream(streamLower->id);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
.Times(1); // On removing a closed stream
CHECK_NOTNULL(manager.getStream(stream3id))->sendState =
StreamSendState::Closed;
CHECK_NOTNULL(manager.getStream(stream3id))->recvState =
StreamRecvState::Closed;
manager.removeClosedStream(stream3id);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
.Times(1); // On stream creation - remote stream
auto peerStreamId = 20;
ASSERT_TRUE(isRemoteStream(conn.nodeType, peerStreamId));
auto stream4 = manager.getStream(peerStreamId);
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
.Times(1); // On clearing the background parameters
transport->clearBackgroundModeParameters();
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_))
.Times(0); // Background params not set
stream4->sendState = StreamSendState::Closed;
stream4->recvState = StreamRecvState::Closed;
manager.removeClosedStream(stream4->id);
CHECK_NOTNULL(manager.getStream(stream2Id))->sendState =
StreamSendState::Closed;
CHECK_NOTNULL(manager.getStream(stream2Id))->recvState =
StreamRecvState::Closed;
manager.removeClosedStream(stream2Id);
}
} // namespace test
} // namespace quic

View File

@@ -236,6 +236,15 @@ bool QuicStreamManager::setStreamPriority(
return false;
}
stream->priority = newPriority;
auto priorityMapEntry = streamPriorityLevels_.find(id);
if (priorityMapEntry == streamPriorityLevels_.end()) {
throw QuicTransportException(
"Active stream not in stream priority map",
TransportErrorCode::STREAM_STATE_ERROR);
} else {
priorityMapEntry->second = newPriority.level;
}
notifyStreamPriorityChanges();
// If this stream is already in the writable or loss queus, update the
// priority there.
writableStreams_.updateIfExist(id, stream->priority);
@@ -273,6 +282,7 @@ QuicStreamManager::getOrCreateOpenedLocalStream(StreamId streamId) {
throw QuicTransportException(
"Creating an active stream", TransportErrorCode::STREAM_STATE_ERROR);
}
addToStreamPriorityMap(it.first->second);
return &it.first->second;
}
return nullptr;
@@ -358,6 +368,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
}
@@ -386,6 +397,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
}
@@ -423,6 +435,7 @@ QuicStreamManager::createStream(StreamId streamId) {
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
updateAppIdleState();
return &it.first->second;
@@ -448,6 +461,15 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
windowUpdates_.erase(streamId);
stopSendingStreams_.erase(streamId);
flowControlUpdated_.erase(streamId);
{
const auto streamPriorityIt = streamPriorityLevels_.find(streamId);
if (streamPriorityIt == streamPriorityLevels_.end()) {
throw QuicTransportException(
"Removed stream is not in the priority map",
TransportErrorCode::STREAM_STATE_ERROR);
}
streamPriorityLevels_.erase(streamPriorityIt);
}
if (it->second.isControl) {
DCHECK_GT(numControlStreams_, 0);
numControlStreams_--;
@@ -495,6 +517,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
openLocalStreams.erase(streamId);
}
updateAppIdleState();
notifyStreamPriorityChanges();
}
void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
@@ -567,4 +590,59 @@ bool QuicStreamManager::isAppIdle() const {
return isAppIdle_;
}
PriorityLevel QuicStreamManager::getHighestPriorityLevel() const {
// Highest priority is minimum value
auto min = kDefaultMaxPriority;
for (auto& entry : streamPriorityLevels_) {
if (entry.second < min) {
min = entry.second;
}
if (min == 0) {
break;
}
}
return min;
}
void QuicStreamManager::setPriorityChangesObserver(
QuicStreamPrioritiesObserver* observer) {
priorityChangesObserver_ = observer;
}
void QuicStreamManager::resetPriorityChangesObserver() {
if (!priorityChangesObserver_) {
return;
}
priorityChangesObserver_ = nullptr;
}
void QuicStreamManager::notifyStreamPriorityChanges() {
if (priorityChangesObserver_) {
priorityChangesObserver_->onStreamPrioritiesChange();
}
}
void QuicStreamManager::addToStreamPriorityMap(
const QuicStreamState& streamState) {
auto entry = streamPriorityLevels_.emplace(
streamState.id, PriorityLevel(streamState.priority.level));
// Verify stream didn't already exist in streamPriorityLevels_
if (!entry.second) {
throw QuicTransportException(
"Attempted to add stream already in priority map",
TransportErrorCode::STREAM_STATE_ERROR);
}
// Verify inserted item
if (entry.first->second != PriorityLevel(streamState.priority.level)) {
throw QuicTransportException(
"Failed to add stream to priority map",
TransportErrorCode::STREAM_STATE_ERROR);
}
// Notify observer (if set)
notifyStreamPriorityChanges();
}
} // namespace quic

View File

@@ -12,6 +12,7 @@
#include <folly/container/F14Set.h>
#include <quic/QuicConstants.h>
#include <quic/codec/Types.h>
#include <quic/state/QuicStreamPrioritiesObserver.h>
#include <quic/state/StreamData.h>
#include <quic/state/TransportSettings.h>
#include <numeric>
@@ -110,6 +111,7 @@ class QuicStreamManager {
newPeerStreams_ = std::move(other.newPeerStreams_);
blockedStreams_ = std::move(other.blockedStreams_);
stopSendingStreams_ = std::move(other.stopSendingStreams_);
streamPriorityLevels_ = std::move(other.streamPriorityLevels_);
windowUpdates_ = std::move(other.windowUpdates_);
flowControlUpdated_ = std::move(other.flowControlUpdated_);
lossStreams_ = std::move(other.lossStreams_);
@@ -837,6 +839,23 @@ class QuicStreamManager {
bool isAppIdle() const;
/*
* Sets an observer that will be notified whenever the set of stream
* priorities changes
*/
void setPriorityChangesObserver(QuicStreamPrioritiesObserver* observer);
/*
* Stops notifications for changes to the set of stream priorities
*/
void resetPriorityChangesObserver();
/*
* Returns the highest priority level used by any stream
* (Highest priority is lowest value)
*/
[[nodiscard]] PriorityLevel getHighestPriorityLevel() const;
private:
// Updates the congestion controller app-idle state, after a change in the
// number of streams.
@@ -858,6 +877,9 @@ class QuicStreamManager {
uint64_t maxStreams,
bool force);
void addToStreamPriorityMap(const QuicStreamState& streamState);
void notifyStreamPriorityChanges();
QuicConnectionStateBase& conn_;
QuicNodeType nodeType_;
@@ -937,6 +959,9 @@ class QuicStreamManager {
// Map of streams where the peer was asked to stop sending
folly::F14FastMap<StreamId, ApplicationErrorCode> stopSendingStreams_;
// Map of stream priority levels
folly::F14FastMap<StreamId, PriorityLevel> streamPriorityLevels_;
// Streams that had their stream window change and potentially need a window
// update sent
folly::F14FastSet<StreamId> windowUpdates_;
@@ -969,6 +994,9 @@ class QuicStreamManager {
// Streams that are closed but we still have state for
folly::F14FastSet<StreamId> closedStreams_;
// Observer to notify on changes in the streamPriorityLevels_ map
QuicStreamPrioritiesObserver* priorityChangesObserver_{nullptr};
// Record whether or not we are app-idle.
bool isAppIdle_{false};

View File

@@ -0,0 +1,20 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#pragma once
namespace quic {
class QuicStreamPrioritiesObserver {
public:
virtual ~QuicStreamPrioritiesObserver() = default;
virtual void onStreamPrioritiesChange() = 0;
};
} // namespace quic

View File

@@ -64,5 +64,10 @@ class MockPendingPathRateLimiter : public PendingPathRateLimiter {
currentCredit,
uint64_t(TimePoint, std::chrono::microseconds));
};
class MockQuicStreamPrioritiesObserver : public QuicStreamPrioritiesObserver {
public:
MOCK_METHOD0(onStreamPrioritiesChange, void());
};
} // namespace test
} // namespace quic

View File

@@ -11,7 +11,9 @@
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
#include <quic/server/state/ServerStateMachine.h>
#include <quic/state/QuicPriorityQueue.h>
#include <quic/state/QuicStreamManager.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/test/Mocks.h>
using namespace testing;
@@ -338,5 +340,71 @@ TEST_F(QuicStreamManagerTest, WriteBufferMeta) {
EXPECT_TRUE(manager.writableDSRStreams().empty());
}
TEST_F(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) {
// Verify that the StreamPriorityChanges callback function is called
// upon stream creation, priority changes, stream removal.
// For different steps try local (uni/bi)directional streams and remote
// streams
MockQuicStreamPrioritiesObserver mObserver;
auto& manager = *conn.streamManager;
manager.setPriorityChangesObserver(&mObserver);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(2); // On stream creation and on setting the priority
auto stream = manager.createNextUnidirectionalStream().value();
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level);
manager.setStreamPriority(stream->id, 1, false);
EXPECT_EQ(manager.getHighestPriorityLevel(), 1);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(1); // On removing a closed stream
stream->sendState = StreamSendState::Closed;
stream->recvState = StreamRecvState::Closed;
manager.removeClosedStream(stream->id);
// No active stream. Highest priority should return the max value (least
// priority).
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultMaxPriority);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(2); // On stream creation - create two streams - one bidirectional
auto stream2Id = manager.createNextUnidirectionalStream().value()->id;
auto stream3 = manager.createNextBidirectionalStream().value();
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(1); // On increasing the priority of one of the streams
manager.setStreamPriority(stream3->id, 0, false);
EXPECT_EQ(manager.getHighestPriorityLevel(), 0);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(1); // On removing a closed stream;
stream3->sendState = StreamSendState::Closed;
stream3->recvState = StreamRecvState::Closed;
manager.removeClosedStream(stream3->id);
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(1); // On stream creation - remote stream
auto peerStreamId = 20;
ASSERT_TRUE(isRemoteStream(conn.nodeType, peerStreamId));
auto stream4 = manager.getStream(peerStreamId);
EXPECT_NE(stream4, nullptr);
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level);
EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(0); // Removing streams but observer removed
manager.resetPriorityChangesObserver();
stream4->sendState = StreamSendState::Closed;
stream4->recvState = StreamRecvState::Closed;
manager.removeClosedStream(stream4->id);
CHECK_NOTNULL(manager.getStream(stream2Id))->sendState =
StreamSendState::Closed;
CHECK_NOTNULL(manager.getStream(stream2Id))->recvState =
StreamRecvState::Closed;
manager.removeClosedStream(stream2Id);
}
} // namespace test
} // namespace quic