1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Add a flag to notify user on new streams explicitly

Summary:
Current behavior is to create streams "lazily", that is pre-allocate stream ids that are lower than the stream id received in stream frame, even though we have not seen the lower streams yet. The `onNew(Bi|Uni)directionalStream()` callbacks are also raised implicitly, that is on lower stream ids pre-allocation, instead of when we actually get a stream frame with those lower ids.

With the new flag the callbacks will be raised to the user on actual arrival of the frame with the stream id on the wire.

Reviewed By: mjoras

Differential Revision: D36465978

fbshipit-source-id: 91faaadaad106a2bf62348c8fe8f00b3d7c61c2c
This commit is contained in:
Konstantin Tsoy
2022-05-19 20:01:28 -07:00
committed by Facebook GitHub Bot
parent e612a31218
commit e66b3b25cb
5 changed files with 299 additions and 187 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -67,7 +67,7 @@ static LocalErrorCode openPeerStreamIfNotClosed(
folly::F14FastSet<StreamId>& openStreams,
StreamId& nextAcceptableStreamId,
StreamId maxStreamId,
std::vector<StreamId>& newStreams) {
std::vector<StreamId>* newStreams) {
if (streamId < nextAcceptableStreamId) {
return LocalErrorCode::CREATING_EXISTING_STREAM;
}
@@ -78,10 +78,14 @@ static LocalErrorCode openPeerStreamIfNotClosed(
StreamId start = nextAcceptableStreamId;
auto numNewStreams = (streamId - start) / detail::kStreamIncrement;
openStreams.reserve(openStreams.size() + numNewStreams);
newStreams.reserve(newStreams.size() + numNewStreams);
if (newStreams) {
newStreams->reserve(newStreams->size() + numNewStreams);
}
while (start <= streamId) {
openStreams.emplace(start);
newStreams.push_back(start);
if (newStreams) {
newStreams->push_back(start);
}
start += detail::kStreamIncrement;
}
@@ -330,6 +334,20 @@ QuicStreamManager::createNextUnidirectionalStream() {
return stream;
}
QuicStreamState* FOLLY_NULLABLE
QuicStreamManager::instantiatePeerStream(StreamId streamId) {
if (transportSettings_->notifyOnNewStreamsExplicitly) {
newPeerStreams_.push_back(streamId);
}
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
}
QuicStreamState* FOLLY_NULLABLE
QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
// This function maintains 3 invariants:
@@ -365,13 +383,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
: openBidirectionalPeerStreams_;
if (openPeerStreams.count(streamId)) {
// Stream was already open, create the state for it lazily.
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
return instantiatePeerStream(streamId);
}
auto& nextAcceptableStreamId = isUnidirectionalStream(streamId)
@@ -385,7 +397,9 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
openPeerStreams,
nextAcceptableStreamId,
maxStreamId,
newPeerStreams_);
(transportSettings_->notifyOnNewStreamsExplicitly ? nullptr
: &newPeerStreams_));
if (openedResult == LocalErrorCode::CREATING_EXISTING_STREAM) {
// Stream could be closed here.
return nullptr;
@@ -394,13 +408,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) {
"Exceeded stream limit.", TransportErrorCode::STREAM_LIMIT_ERROR);
}
auto it = streams_.emplace(
std::piecewise_construct,
std::forward_as_tuple(streamId),
std::forward_as_tuple(streamId, conn_));
addToStreamPriorityMap(it.first->second);
QUIC_STATS(conn_.statsCallback, onNewQuicStream);
return &it.first->second;
return instantiatePeerStream(streamId);
}
folly::Expected<QuicStreamState*, LocalErrorCode>

View File

@@ -940,6 +940,9 @@ class QuicStreamManager {
void addToStreamPriorityMap(const QuicStreamState& streamState);
void notifyStreamPriorityChanges();
// helper to create a new peer stream.
QuicStreamState* FOLLY_NULLABLE instantiatePeerStream(StreamId streamId);
QuicConnectionStateBase& conn_;
QuicNodeType nodeType_;

View File

@@ -295,6 +295,9 @@ struct TransportSettings {
bool enableWritableBytesLimit{false};
// Whether or not to remove data from the loss buffer on spurious loss.
bool removeFromLossBufferOnSpurious{false};
// If set to true, the users won't get new stream notification until an
// actual stream frame with the new stream id arrives.
bool notifyOnNewStreamsExplicitly{false};
};
} // namespace quic

View File

@@ -20,7 +20,13 @@ using namespace testing;
namespace quic {
namespace test {
class QuicStreamManagerTest : public Test {
struct StreamManagerTestParam {
bool notifyOnNewStreamsExplicitly;
};
class QuicStreamManagerTest
: public Test,
public WithParamInterface<StreamManagerTestParam> {
public:
QuicStreamManagerTest()
: conn(FizzServerQuicHandshakeContext::Builder().build()) {}
@@ -41,13 +47,17 @@ class QuicStreamManagerTest : public Test {
std::make_unique<NiceMock<MockCongestionController>>();
mockController = congestionController.get();
conn.congestionController = std::move(congestionController);
conn.transportSettings.notifyOnNewStreamsExplicitly =
GetParam().notifyOnNewStreamsExplicitly;
conn.streamManager->refreshTransportSettings(conn.transportSettings);
}
QuicServerConnectionState conn;
MockCongestionController* mockController;
};
TEST_F(QuicStreamManagerTest, SkipRedundantPriorityUpdate) {
TEST_P(QuicStreamManagerTest, SkipRedundantPriorityUpdate) {
auto& manager = *conn.streamManager;
auto stream = manager.createNextBidirectionalStream();
auto streamId = stream.value()->id;
@@ -62,7 +72,7 @@ TEST_F(QuicStreamManagerTest, SkipRedundantPriorityUpdate) {
!currentPriority.incremental));
}
TEST_F(QuicStreamManagerTest, TestAppIdleCreateBidiStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleCreateBidiStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
@@ -81,7 +91,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleCreateBidiStream) {
EXPECT_EQ(manager.getStream(id), nullptr);
}
TEST_F(QuicStreamManagerTest, TestAppIdleCreateUnidiStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleCreateUnidiStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
EXPECT_CALL(*mockController, setAppIdle(false, _)).Times(0);
@@ -96,7 +106,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleCreateUnidiStream) {
EXPECT_TRUE(manager.isAppIdle());
}
TEST_F(QuicStreamManagerTest, TestAppIdleExistingLocalStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleExistingLocalStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
EXPECT_CALL(*mockController, setAppIdle(false, _)).Times(0);
@@ -112,7 +122,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleExistingLocalStream) {
EXPECT_TRUE(manager.isAppIdle());
}
TEST_F(QuicStreamManagerTest, TestAppIdleStreamAsControl) {
TEST_P(QuicStreamManagerTest, TestAppIdleStreamAsControl) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
@@ -128,7 +138,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleStreamAsControl) {
EXPECT_FALSE(manager.isAppIdle());
}
TEST_F(QuicStreamManagerTest, TestAppIdleCreatePeerStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleCreatePeerStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
StreamId id = 0;
@@ -145,7 +155,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleCreatePeerStream) {
EXPECT_FALSE(manager.isAppIdle());
}
TEST_F(QuicStreamManagerTest, TestAppIdleExistingPeerStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleExistingPeerStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
EXPECT_CALL(*mockController, setAppIdle(false, _)).Times(0);
@@ -162,7 +172,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleExistingPeerStream) {
EXPECT_TRUE(manager.isAppIdle());
}
TEST_F(QuicStreamManagerTest, TestAppIdleClosePeerStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleClosePeerStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
StreamId id = 0;
@@ -178,7 +188,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleClosePeerStream) {
EXPECT_EQ(manager.getStream(id), nullptr);
}
TEST_F(QuicStreamManagerTest, TestAppIdleCloseControlStream) {
TEST_P(QuicStreamManagerTest, TestAppIdleCloseControlStream) {
auto& manager = *conn.streamManager;
EXPECT_FALSE(manager.isAppIdle());
EXPECT_CALL(*mockController, setAppIdle(false, _)).Times(0);
@@ -198,7 +208,7 @@ TEST_F(QuicStreamManagerTest, TestAppIdleCloseControlStream) {
EXPECT_TRUE(manager.isAppIdle());
}
TEST_F(QuicStreamManagerTest, StreamLimitWindowedUpdate) {
TEST_P(QuicStreamManagerTest, StreamLimitWindowedUpdate) {
auto& manager = *conn.streamManager;
conn.transportSettings.advertisedInitialMaxStreamsBidi = 100;
conn.transportSettings.advertisedInitialMaxStreamsUni = 100;
@@ -229,7 +239,7 @@ TEST_F(QuicStreamManagerTest, StreamLimitWindowedUpdate) {
EXPECT_FALSE(manager.remoteUnidirectionalStreamLimitUpdate());
}
TEST_F(QuicStreamManagerTest, StreamLimitNoWindowedUpdate) {
TEST_P(QuicStreamManagerTest, StreamLimitNoWindowedUpdate) {
auto& manager = *conn.streamManager;
conn.transportSettings.advertisedInitialMaxStreamsBidi = 100;
manager.refreshTransportSettings(conn.transportSettings);
@@ -247,7 +257,7 @@ TEST_F(QuicStreamManagerTest, StreamLimitNoWindowedUpdate) {
EXPECT_FALSE(update);
}
TEST_F(QuicStreamManagerTest, StreamLimitManyWindowedUpdate) {
TEST_P(QuicStreamManagerTest, StreamLimitManyWindowedUpdate) {
auto& manager = *conn.streamManager;
conn.transportSettings.advertisedInitialMaxStreamsBidi = 100;
manager.refreshTransportSettings(conn.transportSettings);
@@ -268,7 +278,7 @@ TEST_F(QuicStreamManagerTest, StreamLimitManyWindowedUpdate) {
EXPECT_FALSE(manager.remoteUnidirectionalStreamLimitUpdate());
}
TEST_F(QuicStreamManagerTest, StreamLimitIncrementBidi) {
TEST_P(QuicStreamManagerTest, StreamLimitIncrementBidi) {
auto& manager = *conn.streamManager;
manager.setMaxLocalBidirectionalStreams(100, true);
manager.refreshTransportSettings(conn.transportSettings);
@@ -283,7 +293,7 @@ TEST_F(QuicStreamManagerTest, StreamLimitIncrementBidi) {
EXPECT_EQ(s.value()->id, max + detail::kStreamIncrement);
}
TEST_F(QuicStreamManagerTest, ConsumeStopSending) {
TEST_P(QuicStreamManagerTest, ConsumeStopSending) {
auto& manager = *conn.streamManager;
manager.addStopSending(0, GenericApplicationErrorCode::NO_ERROR);
EXPECT_EQ(manager.stopSendingStreams().size(), 1);
@@ -294,7 +304,7 @@ TEST_F(QuicStreamManagerTest, ConsumeStopSending) {
EXPECT_TRUE(manager.stopSendingStreams().empty());
}
TEST_F(QuicStreamManagerTest, StreamLimitIncrementUni) {
TEST_P(QuicStreamManagerTest, StreamLimitIncrementUni) {
auto& manager = *conn.streamManager;
manager.setMaxLocalUnidirectionalStreams(100, true);
manager.refreshTransportSettings(conn.transportSettings);
@@ -309,7 +319,7 @@ TEST_F(QuicStreamManagerTest, StreamLimitIncrementUni) {
EXPECT_EQ(s.value()->id, max + detail::kStreamIncrement);
}
TEST_F(QuicStreamManagerTest, NextAcceptableLocalUnidirectionalStreamId) {
TEST_P(QuicStreamManagerTest, NextAcceptableLocalUnidirectionalStreamId) {
auto& manager = *conn.streamManager;
// local is server
@@ -344,7 +354,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptableLocalUnidirectionalStreamId) {
serverStreamId3, manager.nextAcceptableLocalUnidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptableLocalBidirectionalStreamId) {
TEST_P(QuicStreamManagerTest, NextAcceptableLocalBidirectionalStreamId) {
auto& manager = *conn.streamManager;
// local is server
@@ -378,7 +388,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptableLocalBidirectionalStreamId) {
serverStreamId3, manager.nextAcceptableLocalBidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptableLocalUnidirectionalStreamIdLimit) {
TEST_P(QuicStreamManagerTest, NextAcceptableLocalUnidirectionalStreamIdLimit) {
auto& manager = *conn.streamManager;
manager.setMaxLocalUnidirectionalStreams(2, true);
@@ -410,7 +420,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptableLocalUnidirectionalStreamIdLimit) {
EXPECT_EQ(folly::none, manager.nextAcceptableLocalUnidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptableLocalBidirectionalStreamIdLimit) {
TEST_P(QuicStreamManagerTest, NextAcceptableLocalBidirectionalStreamIdLimit) {
auto& manager = *conn.streamManager;
manager.setMaxLocalBidirectionalStreams(2, true);
@@ -441,7 +451,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptableLocalBidirectionalStreamIdLimit) {
EXPECT_EQ(folly::none, manager.nextAcceptableLocalBidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptablePeerUnidirectionalStreamId) {
TEST_P(QuicStreamManagerTest, NextAcceptablePeerUnidirectionalStreamId) {
auto& manager = *conn.streamManager;
// local is server, so remote/peer is client
@@ -476,7 +486,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptablePeerUnidirectionalStreamId) {
clientStreamId3, manager.nextAcceptablePeerUnidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptablePeerBidirectionalStreamId) {
TEST_P(QuicStreamManagerTest, NextAcceptablePeerBidirectionalStreamId) {
auto& manager = *conn.streamManager;
// local is server, so remote/peer is client
@@ -504,7 +514,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptablePeerBidirectionalStreamId) {
EXPECT_EQ(clientStreamId3, manager.nextAcceptablePeerBidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptablePeerUnidirectionalStreamIdLimit) {
TEST_P(QuicStreamManagerTest, NextAcceptablePeerUnidirectionalStreamIdLimit) {
auto& manager = *conn.streamManager;
conn.transportSettings.advertisedInitialMaxStreamsUni = 2;
manager.refreshTransportSettings(conn.transportSettings);
@@ -537,7 +547,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptablePeerUnidirectionalStreamIdLimit) {
EXPECT_EQ(folly::none, manager.nextAcceptablePeerUnidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, NextAcceptablePeerBidirectionalStreamIdLimit) {
TEST_P(QuicStreamManagerTest, NextAcceptablePeerBidirectionalStreamIdLimit) {
auto& manager = *conn.streamManager;
conn.transportSettings.advertisedInitialMaxStreamsBidi = 2;
manager.refreshTransportSettings(conn.transportSettings);
@@ -565,7 +575,7 @@ TEST_F(QuicStreamManagerTest, NextAcceptablePeerBidirectionalStreamIdLimit) {
EXPECT_EQ(folly::none, manager.nextAcceptablePeerBidirectionalStreamId());
}
TEST_F(QuicStreamManagerTest, TestClearActionable) {
TEST_P(QuicStreamManagerTest, TestClearActionable) {
auto& manager = *conn.streamManager;
StreamId id = 1;
@@ -586,7 +596,7 @@ TEST_F(QuicStreamManagerTest, TestClearActionable) {
EXPECT_TRUE(manager.peekableStreams().empty());
}
TEST_F(QuicStreamManagerTest, WriteBufferMeta) {
TEST_P(QuicStreamManagerTest, WriteBufferMeta) {
auto& manager = *conn.streamManager;
auto stream = manager.createNextUnidirectionalStream().value();
// Add some real data into write buffer
@@ -606,7 +616,7 @@ TEST_F(QuicStreamManagerTest, WriteBufferMeta) {
EXPECT_TRUE(manager.writableDSRStreams().empty());
}
TEST_F(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) {
TEST_P(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) {
// Verify that the StreamPriorityChanges callback function is called
// upon stream creation, priority changes, stream removal.
// For different steps try local (uni/bi)directional streams and remote
@@ -672,7 +682,7 @@ TEST_F(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) {
manager.removeClosedStream(stream2Id);
}
TEST_F(QuicStreamManagerTest, StreamPriorityExcludesControl) {
TEST_P(QuicStreamManagerTest, StreamPriorityExcludesControl) {
MockQuicStreamPrioritiesObserver mObserver;
auto& manager = *conn.streamManager;
@@ -692,5 +702,12 @@ TEST_F(QuicStreamManagerTest, StreamPriorityExcludesControl) {
manager.removeClosedStream(stream->id);
}
INSTANTIATE_TEST_SUITE_P(
QuicStreamManagerTest,
QuicStreamManagerTest,
::testing::Values(
StreamManagerTestParam{.notifyOnNewStreamsExplicitly = false},
StreamManagerTestParam{.notifyOnNewStreamsExplicitly = true}));
} // namespace test
} // namespace quic