mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
Fix app limited to account for peer created streams
Summary: Previously updateAppLimited was not accounting for peer created streams. This moves app limited accounting into the stream manager This also makes a subtle change to the applimited callback. We assume that we start off as not app limited now, so if we create a stream we will not call the congestion controller app limited callback. Reviewed By: mjoras Differential Revision: D15166114 fbshipit-source-id: 1a8d573533861f53bb1bd9fdc605dfefe68902dc
This commit is contained in:
committed by
Facebook Github Bot
parent
ba1ce44645
commit
1d9e4034c9
@@ -1519,7 +1519,6 @@ QuicTransportBase::createStreamInternal(bool bidirectional) {
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
bool hadNonCtrlStreams = conn_->streamManager->hasNonCtrlStreams();
|
||||
folly::Expected<QuicStreamState*, LocalErrorCode> streamResult;
|
||||
if (bidirectional) {
|
||||
streamResult = conn_->streamManager->createNextBidirectionalStream();
|
||||
@@ -1527,7 +1526,6 @@ QuicTransportBase::createStreamInternal(bool bidirectional) {
|
||||
streamResult = conn_->streamManager->createNextUnidirectionalStream();
|
||||
}
|
||||
if (streamResult) {
|
||||
updateAppLimitedState(hadNonCtrlStreams);
|
||||
return streamResult.value()->id;
|
||||
} else {
|
||||
return folly::makeUnexpected(streamResult.error());
|
||||
@@ -1840,7 +1838,6 @@ void QuicTransportBase::checkForClosedStream() {
|
||||
return;
|
||||
}
|
||||
auto itr = conn_->streamManager->closedStreams().begin();
|
||||
bool hadNonCtrlStreams = conn_->streamManager->hasNonCtrlStreams();
|
||||
while (itr != conn_->streamManager->closedStreams().end()) {
|
||||
auto callbackIt = readCallbacks_.find(*itr);
|
||||
// We might be in the process of delivering all the delivery callbacks for
|
||||
@@ -1868,7 +1865,6 @@ void QuicTransportBase::checkForClosedStream() {
|
||||
conn_->streamManager->streamCount() == 0) {
|
||||
closeImpl(folly::none);
|
||||
}
|
||||
updateAppLimitedState(hadNonCtrlStreams);
|
||||
}
|
||||
|
||||
void QuicTransportBase::sendPing(
|
||||
@@ -2195,26 +2191,13 @@ void QuicTransportBase::detachEventBase() {
|
||||
evb_ = nullptr;
|
||||
}
|
||||
|
||||
void QuicTransportBase::updateAppLimitedState(bool hadNonCtrlStreams) {
|
||||
bool hasNonCtrlStreams = conn_->streamManager->hasNonCtrlStreams();
|
||||
if (hadNonCtrlStreams == hasNonCtrlStreams) {
|
||||
return;
|
||||
}
|
||||
if (conn_->congestionController) {
|
||||
conn_->congestionController->setAppLimited(
|
||||
hadNonCtrlStreams && !hasNonCtrlStreams, Clock::now());
|
||||
}
|
||||
}
|
||||
|
||||
folly::Optional<LocalErrorCode> QuicTransportBase::setControlStream(
|
||||
StreamId id) {
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return LocalErrorCode::STREAM_NOT_EXISTS;
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
|
||||
bool hadNonCtrlStreams = conn_->streamManager->hasNonCtrlStreams();
|
||||
conn_->streamManager->setStreamAsControl(*stream);
|
||||
updateAppLimitedState(hadNonCtrlStreams);
|
||||
return folly::none;
|
||||
}
|
||||
|
||||
|
@@ -223,14 +223,6 @@ class QuicTransportBase : public QuicSocket {
|
||||
|
||||
folly::Optional<LocalErrorCode> setControlStream(StreamId id) override;
|
||||
|
||||
// Updates the congestion controller app limited state, after a change in the
|
||||
// number of streams.
|
||||
// App limited state is set to true if there was at least one non-control
|
||||
// before the update and there are none after. It is set to false if instead
|
||||
// there were no non-control streams before and there is at least one at the
|
||||
// time of calling
|
||||
void updateAppLimitedState(bool hadNonCtrlStreams);
|
||||
|
||||
/**
|
||||
* Invoke onCanceled for all the delivery callbacks in the deliveryCallbacks
|
||||
* passed in. This is supposed to be a copy of the real deque of the delivery
|
||||
|
@@ -1472,10 +1472,10 @@ TEST_P(QuicTransportImplTestUniBidi, AppLimitedTest) {
|
||||
auto rawCongestionController = mockCongestionController.get();
|
||||
conn.congestionController = std::move(mockCongestionController);
|
||||
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(false, _)).Times(1);
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(false, _)).Times(0);
|
||||
auto stream = createStream(transport, GetParam());
|
||||
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(true, _)).Times(1);
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(true, _));
|
||||
transport->closeStream(stream);
|
||||
}
|
||||
|
||||
@@ -1485,7 +1485,7 @@ TEST_P(QuicTransportImplTestUniBidi, AppLimitedTestControlStreams) {
|
||||
auto rawCongestionController = mockCongestionController.get();
|
||||
conn.congestionController = std::move(mockCongestionController);
|
||||
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(false, _)).Times(1);
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(false, _)).Times(0);
|
||||
auto stream = createStream(transport, GetParam());
|
||||
ASSERT_TRUE(stream);
|
||||
|
||||
@@ -1496,7 +1496,7 @@ TEST_P(QuicTransportImplTestUniBidi, AppLimitedTestControlStreams) {
|
||||
ASSERT_TRUE(ctrlStream2);
|
||||
transport->setControlStream(ctrlStream2);
|
||||
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(true, _)).Times(1);
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(true, _));
|
||||
transport->closeStream(stream);
|
||||
}
|
||||
|
||||
@@ -1506,7 +1506,6 @@ TEST_P(QuicTransportImplTestUniBidi, AppLimitedTestOnlyControlStreams) {
|
||||
auto rawCongestionController = mockCongestionController.get();
|
||||
conn.congestionController = std::move(mockCongestionController);
|
||||
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(false, _)).Times(1);
|
||||
auto ctrlStream1 = createStream(transport, GetParam());
|
||||
EXPECT_CALL(*rawCongestionController, setAppLimited(true, _)).Times(1);
|
||||
transport->setControlStream(ctrlStream1);
|
||||
|
@@ -152,7 +152,9 @@ QuicStreamManager::getOrCreateOpenedLocalStream(StreamId streamId) {
|
||||
|
||||
QuicStreamState* QuicStreamManager::getStream(StreamId streamId) {
|
||||
if (isRemoteStream(nodeType_, streamId)) {
|
||||
return getOrCreatePeerStream(streamId);
|
||||
auto stream = getOrCreatePeerStream(streamId);
|
||||
updateAppLimitedState();
|
||||
return stream;
|
||||
}
|
||||
auto it = streams_.find(streamId);
|
||||
if (it != streams_.end()) {
|
||||
@@ -167,6 +169,7 @@ QuicStreamState* QuicStreamManager::getStream(StreamId streamId) {
|
||||
"Trying to get unopened local stream",
|
||||
TransportErrorCode::STREAM_STATE_ERROR);
|
||||
}
|
||||
updateAppLimitedState();
|
||||
return stream;
|
||||
}
|
||||
|
||||
@@ -295,6 +298,7 @@ QuicStreamManager::createStream(StreamId streamId) {
|
||||
std::forward_as_tuple(streamId),
|
||||
std::forward_as_tuple(streamId, conn_));
|
||||
QUIC_STATS(conn_.infoCallback, onNewQuicStream);
|
||||
updateAppLimitedState();
|
||||
return &it.first->second;
|
||||
}
|
||||
|
||||
@@ -338,6 +342,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
|
||||
openLocalStreams_.erase(streamItr);
|
||||
}
|
||||
}
|
||||
updateAppLimitedState();
|
||||
}
|
||||
|
||||
void QuicStreamManager::updateLossStreams(QuicStreamState& stream) {
|
||||
@@ -398,4 +403,30 @@ void QuicStreamManager::updatePeekableStreams(QuicStreamState& stream) {
|
||||
}
|
||||
}
|
||||
|
||||
void QuicStreamManager::updateAppLimitedState() {
|
||||
bool currentNonCtrlStreams = hasNonCtrlStreams();
|
||||
if (isAppLimited_ && !currentNonCtrlStreams) {
|
||||
// We were app limited, and we continue to be app limited.
|
||||
return;
|
||||
} else if (!isAppLimited_ && currentNonCtrlStreams) {
|
||||
// We were not app limited, and we continue to be not app limited.
|
||||
return;
|
||||
}
|
||||
isAppLimited_ = !currentNonCtrlStreams;
|
||||
if (conn_.congestionController) {
|
||||
conn_.congestionController->setAppLimited(isAppLimited_, Clock::now());
|
||||
}
|
||||
}
|
||||
|
||||
void QuicStreamManager::setStreamAsControl(QuicStreamState& stream) {
|
||||
if (!stream.isControl) {
|
||||
stream.isControl = true;
|
||||
numControlStreams_++;
|
||||
}
|
||||
updateAppLimitedState();
|
||||
}
|
||||
|
||||
bool QuicStreamManager::isAppLimited() const {
|
||||
return isAppLimited_;
|
||||
}
|
||||
} // namespace quic
|
||||
|
@@ -548,12 +548,7 @@ class QuicStreamManager {
|
||||
/*
|
||||
* Sets the given stream to be tracked as a control stream.
|
||||
*/
|
||||
void setStreamAsControl(QuicStreamState& stream) {
|
||||
if (!stream.isControl) {
|
||||
stream.isControl = true;
|
||||
numControlStreams_++;
|
||||
}
|
||||
}
|
||||
void setStreamAsControl(QuicStreamState& stream);
|
||||
|
||||
/*
|
||||
* Clear the tracking of streams which can trigger API callbacks.
|
||||
@@ -566,7 +561,17 @@ class QuicStreamManager {
|
||||
dataRejectedStreams_.clear();
|
||||
}
|
||||
|
||||
bool isAppLimited() const;
|
||||
|
||||
private:
|
||||
// Updates the congestion controller app limited state, after a change in the
|
||||
// number of streams.
|
||||
// App limited state is set to true if there was at least one non-control
|
||||
// before the update and there are none after. It is set to false if instead
|
||||
// there were no non-control streams before and there is at least one at the
|
||||
// time of calling
|
||||
void updateAppLimitedState();
|
||||
|
||||
QuicStreamState* FOLLY_NULLABLE
|
||||
getOrCreateOpenedLocalStream(StreamId streamId);
|
||||
|
||||
@@ -658,6 +663,9 @@ class QuicStreamManager {
|
||||
|
||||
// Data structure to keep track of stream that have detected lost data
|
||||
std::vector<StreamId> lossStreams_;
|
||||
|
||||
// Record whether or not we are app limited.
|
||||
bool isAppLimited_{false};
|
||||
};
|
||||
|
||||
} // namespace quic
|
||||
|
@@ -26,7 +26,6 @@ class MockCongestionController : public CongestionController {
|
||||
MOCK_CONST_METHOD0(getCongestionWindow, uint64_t());
|
||||
MOCK_METHOD0(onSpuriousLoss, void());
|
||||
GMOCK_METHOD1_(, , , setConnectionEmulation, void(uint8_t));
|
||||
MOCK_METHOD1(setApplicationLimited, void(bool));
|
||||
MOCK_CONST_METHOD0(canBePaced, bool());
|
||||
MOCK_CONST_METHOD0(type, CongestionControlType());
|
||||
GMOCK_METHOD1_(, , , getPacingRate, uint64_t(TimePoint));
|
||||
|
179
quic/state/test/QuicStreamManagerTest.cpp
Normal file
179
quic/state/test/QuicStreamManagerTest.cpp
Normal file
@@ -0,0 +1,179 @@
|
||||
/*
|
||||
* Copyright (c) Facebook, Inc. and its affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*
|
||||
*/
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <quic/server/state/ServerStateMachine.h>
|
||||
#include <quic/state/QuicStreamManager.h>
|
||||
#include <quic/state/test/Mocks.h>
|
||||
|
||||
using namespace folly;
|
||||
using namespace testing;
|
||||
|
||||
namespace quic {
|
||||
namespace test {
|
||||
|
||||
class QuicStreamManagerTest : public Test {
|
||||
public:
|
||||
void SetUp() override {
|
||||
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiLocal =
|
||||
kDefaultStreamWindowSize;
|
||||
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote =
|
||||
kDefaultStreamWindowSize;
|
||||
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetUni =
|
||||
kDefaultStreamWindowSize;
|
||||
conn.flowControlState.peerAdvertisedMaxOffset =
|
||||
kDefaultConnectionWindowSize;
|
||||
conn.streamManager->setMaxLocalBidirectionalStreams(
|
||||
kDefaultMaxStreamsBidirectional);
|
||||
conn.streamManager->setMaxLocalUnidirectionalStreams(
|
||||
kDefaultMaxStreamsUnidirectional);
|
||||
auto congestionController =
|
||||
std::make_unique<NiceMock<MockCongestionController>>();
|
||||
mockController = congestionController.get();
|
||||
conn.congestionController = std::move(congestionController);
|
||||
}
|
||||
|
||||
QuicServerConnectionState conn;
|
||||
MockCongestionController* mockController;
|
||||
};
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedCreateBidiStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
// The app limiited state did not change.
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _)).Times(0);
|
||||
auto stream = manager.createNextBidirectionalStream();
|
||||
StreamId id = stream.value()->id;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
// Force transition to closed state
|
||||
stream.value()->state = StreamStates::Closed();
|
||||
manager.removeClosedStream(stream.value()->id);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
EXPECT_EQ(manager.getStream(id), nullptr);
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedCreateUnidiStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _)).Times(0);
|
||||
auto stream = manager.createNextUnidirectionalStream();
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
// Force transition to closed state
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
stream.value()->state = StreamStates::Closed();
|
||||
manager.removeClosedStream(stream.value()->id);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedExistingLocalStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _)).Times(0);
|
||||
|
||||
auto stream = manager.createNextUnidirectionalStream();
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
manager.setStreamAsControl(*stream.value());
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
|
||||
manager.getStream(stream.value()->id);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedStreamAsControl) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
auto stream = manager.createNextUnidirectionalStream();
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
manager.setStreamAsControl(*stream.value());
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _));
|
||||
manager.createNextUnidirectionalStream();
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedCreatePeerStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
StreamId id = 0;
|
||||
auto stream = manager.getStream(id);
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
manager.setStreamAsControl(*stream);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _));
|
||||
StreamId id2 = 4;
|
||||
manager.getStream(id2);
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedExistingPeerStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _)).Times(0);
|
||||
|
||||
StreamId id = 0;
|
||||
auto stream = manager.getStream(id);
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
manager.setStreamAsControl(*stream);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
|
||||
manager.getStream(id);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedClosePeerStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
StreamId id = 0;
|
||||
auto stream = manager.getStream(id);
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
// Force transition to closed state
|
||||
stream->state = StreamStates::Closed();
|
||||
manager.removeClosedStream(stream->id);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
EXPECT_EQ(manager.getStream(id), nullptr);
|
||||
}
|
||||
|
||||
TEST_F(QuicStreamManagerTest, TestAppLimitedCloseControlStream) {
|
||||
auto& manager = *conn.streamManager;
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
EXPECT_CALL(*mockController, setAppLimited(false, _)).Times(0);
|
||||
|
||||
StreamId id = 0;
|
||||
auto stream = manager.getStream(id);
|
||||
EXPECT_FALSE(manager.isAppLimited());
|
||||
|
||||
EXPECT_CALL(*mockController, setAppLimited(true, _));
|
||||
manager.setStreamAsControl(*stream);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
|
||||
// Force transition to closed state
|
||||
stream->state = StreamStates::Closed();
|
||||
manager.removeClosedStream(stream->id);
|
||||
EXPECT_TRUE(manager.isAppLimited());
|
||||
}
|
||||
} // namespace test
|
||||
} // namespace quic
|
Reference in New Issue
Block a user