From 72e677df33267ac2d572fa5e7bff03d4657a13b0 Mon Sep 17 00:00:00 2001 From: Matt Joras Date: Wed, 18 Sep 2019 11:29:26 -0700 Subject: [PATCH] Send windowed stream limit updates Summary: Implement sending stream limit updates in a windowed fashion, so that as a peer exhausts its streams we will grant it additional credit. This is implemented by having the stream manager check if an update is needed on removing streams, and the api layer potentially sending an update after it initiates the check for closed streams. This also makes some driveby changes to use `std::lower_bound` instead of `std::find` for the sorted collections in the stream manager. Reviewed By: yangchi Differential Revision: D16808229 fbshipit-source-id: f6e3460d43e4d165e362164be00c0cec27cf1e79 --- quic/api/QuicTransportBase.cpp | 8 + quic/api/QuicTransportFunctions.cpp | 11 ++ quic/api/QuicTransportFunctions.h | 2 + quic/api/test/QuicPacketSchedulerTest.cpp | 2 +- quic/client/QuicClientTransport.cpp | 12 +- quic/client/state/ClientStateMachine.h | 3 +- quic/client/test/QuicClientTransportTest.cpp | 25 ++++ quic/codec/QuicWriteCodec.cpp | 34 ++--- quic/codec/Types.h | 7 +- quic/codec/test/QuicPacketRebuilderTest.cpp | 3 +- quic/codec/test/QuicWriteCodecTest.cpp | 21 +-- quic/flowcontrol/test/QuicFlowControlTest.cpp | 4 +- quic/server/state/ServerStateMachine.cpp | 13 +- quic/server/state/ServerStateMachine.h | 3 +- quic/state/QuicStreamManager.cpp | 127 +++++++++++++--- quic/state/QuicStreamManager.h | 138 +++++++++++++++--- quic/state/SimpleFrameFunctions.cpp | 16 ++ quic/state/StateData.h | 3 + quic/state/test/QuicStreamFunctionsTest.cpp | 82 +++++++---- quic/state/test/QuicStreamManagerTest.cpp | 70 +++++++++ 20 files changed, 460 insertions(+), 124 deletions(-) diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 77c0cda2b..aa17261b8 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1979,6 +1979,7 @@ void QuicTransportBase::checkForClosedStream() { stream->totalHolbTime.count(), stream->holbCount); conn_->streamManager->removeClosedStream(*itr); + maybeSendStreamLimitUpdates(*conn_); readCallbacks_.erase(*itr); peekCallbacks_.erase(*itr); itr = conn_->streamManager->closedStreams().erase(itr); @@ -2288,7 +2289,14 @@ void QuicTransportBase::cancelDeliveryCallbacks( void QuicTransportBase::setTransportSettings( TransportSettings transportSettings) { + // If we've already encoded the transport parameters, silently return as + // setting the transport settings again would be buggy. + // TODO should we throw or return Expected here? + if (conn_->transportParametersEncoded) { + return; + } conn_->transportSettings = std::move(transportSettings); + conn_->streamManager->refreshTransportSettings(conn_->transportSettings); setCongestionControl(transportSettings.defaultCongestionController); if (conn_->transportSettings.pacingEnabled) { conn_->pacer = std::make_unique( diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 0d1ab7a30..b85fbe69b 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -1110,4 +1110,15 @@ WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) { } return WriteDataReason::NO_WRITE; } + +void maybeSendStreamLimitUpdates(QuicConnectionStateBase& conn) { + auto update = conn.streamManager->remoteBidirectionalStreamLimitUpdate(); + if (update) { + sendSimpleFrame(conn, (MaxStreamsFrame(*update, true))); + } + update = conn.streamManager->remoteUnidirectionalStreamLimitUpdate(); + if (update) { + sendSimpleFrame(conn, (MaxStreamsFrame(*update, false))); + } +} } // namespace quic diff --git a/quic/api/QuicTransportFunctions.h b/quic/api/QuicTransportFunctions.h index 6edcbdd68..5cb02a281 100644 --- a/quic/api/QuicTransportFunctions.h +++ b/quic/api/QuicTransportFunctions.h @@ -234,4 +234,6 @@ uint64_t writeProbingDataToSocket( HeaderBuilder LongHeaderBuilder(LongHeader::Types packetType); HeaderBuilder ShortHeaderBuilder(); +void maybeSendStreamLimitUpdates(QuicConnectionStateBase& conn); + } // namespace quic diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index a4c336c8a..60b466faf 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -392,7 +392,7 @@ TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) { bool present = false; /* the next four frames should not be written */ present |= boost::get(&frame) ? true : false; - present |= boost::get(&frame) ? true : false; + present |= boost::get(&frame) ? true : false; present |= boost::get(&frame) ? true : false; present |= boost::get(&frame) ? true : false; ASSERT_FALSE(present); diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index 31968e960..7b1ed4372 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -430,17 +430,6 @@ void QuicClientTransport::processPacketData( *stream, streamWindowUpdate.maximumData, packetNum); } }, - [&](MaxStreamsFrame& maxStreamsFrame) { - VLOG(10) << "Client received max streams frame stream=" - << maxStreamsFrame.maxStreams << *this; - if (maxStreamsFrame.isForBidirectionalStream()) { - conn_->streamManager->setMaxLocalBidirectionalStreams( - maxStreamsFrame.maxStreams); - } else { - conn_->streamManager->setMaxLocalUnidirectionalStreams( - maxStreamsFrame.maxStreams); - } - }, [&](DataBlockedFrame&) { VLOG(10) << "Client received blocked " << *this; pktHasRetransmittableData = true; @@ -874,6 +863,7 @@ void QuicClientTransport::startCryptoHandshake() { conn_->transportSettings.ackDelayExponent, conn_->transportSettings.maxRecvPacketSize, customTransportParameters_); + conn_->transportParametersEncoded = true; auto handshakeLayer = clientConn_->clientHandshakeLayer; handshakeLayer->connect( ctx_, diff --git a/quic/client/state/ClientStateMachine.h b/quic/client/state/ClientStateMachine.h index 74412571a..493d26fce 100644 --- a/quic/client/state/ClientStateMachine.h +++ b/quic/client/state/ClientStateMachine.h @@ -74,7 +74,8 @@ struct QuicClientConnectionState : public QuicConnectionStateBase { // We shouldn't normally need to set this until we're starting the // transport, however writing unit tests is much easier if we set this here. updateFlowControlStateWithSettings(flowControlState, transportSettings); - streamManager = std::make_unique(*this, this->nodeType); + streamManager = std::make_unique( + *this, this->nodeType, transportSettings); } }; diff --git a/quic/client/test/QuicClientTransportTest.cpp b/quic/client/test/QuicClientTransportTest.cpp index fb13a7cb7..c2106bd6d 100644 --- a/quic/client/test/QuicClientTransportTest.cpp +++ b/quic/client/test/QuicClientTransportTest.cpp @@ -502,6 +502,31 @@ TEST_P(QuicClientTransportIntegrationTest, NetworkTestConnected) { sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb); } +TEST_P(QuicClientTransportIntegrationTest, SetTransportSettingsAfterStart) { + expectTransportCallbacks(); + auto qLogger = std::make_shared(); + client->getNonConstConn().qLogger = qLogger; + TransportSettings settings; + settings.connectUDP = true; + client->setTransportSettings(settings); + client->start(&clientConnCallback); + + EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] { + CHECK(client->getConn().oneRttWriteCipher); + eventbase_.terminateLoopSoon(); + })); + eventbase_.loopForever(); + + auto streamId = client->createBidirectionalStream().value(); + auto data = IOBuf::copyBuffer("hello"); + auto expected = std::shared_ptr(IOBuf::copyBuffer("echo ")); + expected->prependChain(data->clone()); + sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb); + settings.connectUDP = false; + client->setTransportSettings(settings); + EXPECT_TRUE(client->getTransportSettings().connectUDP); +} + TEST_P(QuicClientTransportIntegrationTest, TestZeroRttSuccess) { auto cachedPsk = setupZeroRttOnClientCtx(*clientCtx, hostname, getVersion()); pskCache_->putPsk(hostname, cachedPsk); diff --git a/quic/codec/QuicWriteCodec.cpp b/quic/codec/QuicWriteCodec.cpp index aab2cfb96..a1a747f0f 100644 --- a/quic/codec/QuicWriteCodec.cpp +++ b/quic/codec/QuicWriteCodec.cpp @@ -434,6 +434,23 @@ size_t writeSimpleFrame( } // no space left in packet return size_t(0); + }, + [&](MaxStreamsFrame& maxStreamsFrame) { + auto frameType = maxStreamsFrame.isForBidirectionalStream() + ? FrameType::MAX_STREAMS_BIDI + : FrameType::MAX_STREAMS_UNI; + QuicInteger intFrameType(static_cast(frameType)); + QuicInteger streamCount(maxStreamsFrame.maxStreams); + auto maxStreamsFrameSize = + intFrameType.getSize() + streamCount.getSize(); + if (packetSpaceCheck(spaceLeft, maxStreamsFrameSize)) { + builder.write(intFrameType); + builder.write(streamCount); + builder.appendFrame(maxStreamsFrame); + return maxStreamsFrameSize; + } + // no space left in packet + return size_t(0); }); } @@ -520,23 +537,6 @@ size_t writeFrame(QuicWriteFrame&& frame, PacketBuilderInterface& builder) { // no space left in packet return size_t(0); }, - [&](MaxStreamsFrame& maxStreamsFrame) { - auto frameType = maxStreamsFrame.isForBidirectionalStream() - ? FrameType::MAX_STREAMS_BIDI - : FrameType::MAX_STREAMS_UNI; - QuicInteger intFrameType(static_cast(frameType)); - QuicInteger streamCount(maxStreamsFrame.maxStreams); - auto maxStreamsFrameSize = - intFrameType.getSize() + streamCount.getSize(); - if (packetSpaceCheck(spaceLeft, maxStreamsFrameSize)) { - builder.write(intFrameType); - builder.write(streamCount); - builder.appendFrame(std::move(maxStreamsFrame)); - return maxStreamsFrameSize; - } - // no space left in packet - return size_t(0); - }, [&](DataBlockedFrame& blockedFrame) { QuicInteger intFrameType(static_cast(FrameType::DATA_BLOCKED)); QuicInteger dataLimit(blockedFrame.dataLimit); diff --git a/quic/codec/Types.h b/quic/codec/Types.h index e6e7372f9..946abda02 100644 --- a/quic/codec/Types.h +++ b/quic/codec/Types.h @@ -564,7 +564,8 @@ using QuicSimpleFrame = boost::variant< ExpiredStreamDataFrame, PathChallengeFrame, PathResponseFrame, - NewConnectionIdFrame>; + NewConnectionIdFrame, + MaxStreamsFrame>; // Types of frames that can be read. using QuicFrame = boost::variant< @@ -574,7 +575,6 @@ using QuicFrame = boost::variant< ApplicationCloseFrame, MaxDataFrame, MaxStreamDataFrame, - MaxStreamsFrame, PingFrame, DataBlockedFrame, StreamDataBlockedFrame, @@ -594,11 +594,10 @@ using QuicWriteFrame = boost::variant< ApplicationCloseFrame, MaxDataFrame, MaxStreamDataFrame, - MaxStreamsFrame, - StreamsBlockedFrame, PingFrame, DataBlockedFrame, StreamDataBlockedFrame, + StreamsBlockedFrame, WriteAckFrame, WriteStreamFrame, WriteCryptoFrame, diff --git a/quic/codec/test/QuicPacketRebuilderTest.cpp b/quic/codec/test/QuicPacketRebuilderTest.cpp index a608829f9..e746c3d9d 100644 --- a/quic/codec/test/QuicPacketRebuilderTest.cpp +++ b/quic/codec/test/QuicPacketRebuilderTest.cpp @@ -130,7 +130,8 @@ TEST_F(QuicPacketRebuilderTest, RebuildPacket) { EXPECT_EQ("The sun is in the sky.", closeFrame.reasonPhrase); EXPECT_EQ(FrameType::ACK, closeFrame.closingFrameType); }, - [](const MaxStreamsFrame& maxStreamFrame) { + [](const QuicSimpleFrame& simpleFrame) { + auto maxStreamFrame = boost::get(simpleFrame); EXPECT_EQ(4321, maxStreamFrame.maxStreams); }, [](const PingFrame& ping) { EXPECT_EQ(PingFrame(), ping); }, diff --git a/quic/codec/test/QuicWriteCodecTest.cpp b/quic/codec/test/QuicWriteCodecTest.cpp index 08124f8d3..5eb6057e8 100644 --- a/quic/codec/test/QuicWriteCodecTest.cpp +++ b/quic/codec/test/QuicWriteCodecTest.cpp @@ -968,13 +968,14 @@ TEST_F(QuicWriteCodecTest, WriteMaxStreamId) { auto streamCountSize = i < 64 ? 1 : 2; // 1 byte for the type and up to 2 bytes for the stream count. EXPECT_EQ(1 + streamCountSize, bytesWritten); - auto resultMaxStreamIdFrame = - boost::get(regularPacket.frames[0]); + auto resultMaxStreamIdFrame = boost::get( + boost::get(regularPacket.frames[0])); EXPECT_EQ(i, resultMaxStreamIdFrame.maxStreams); auto wireBuf = std::move(builtOut.second); folly::io::Cursor cursor(wireBuf.get()); - auto wireStreamsFrame = boost::get(parseQuicFrame(cursor)); + auto wireStreamsFrame = boost::get( + boost::get(parseQuicFrame(cursor))); EXPECT_EQ(i, wireStreamsFrame.maxStreams); EXPECT_TRUE(cursor.isAtEnd()); } @@ -994,13 +995,14 @@ TEST_F(QuicWriteCodecTest, WriteUniMaxStreamId) { auto streamCountSize = i < 64 ? 1 : 2; // 1 byte for the type and up to 2 bytes for the stream count. EXPECT_EQ(1 + streamCountSize, bytesWritten); - auto resultMaxStreamIdFrame = - boost::get(regularPacket.frames[0]); + auto resultMaxStreamIdFrame = boost::get( + boost::get(regularPacket.frames[0])); EXPECT_EQ(i, resultMaxStreamIdFrame.maxStreams); auto wireBuf = std::move(builtOut.second); folly::io::Cursor cursor(wireBuf.get()); - auto wireStreamsFrame = boost::get(parseQuicFrame(cursor)); + auto wireStreamsFrame = boost::get( + boost::get(parseQuicFrame(cursor))); EXPECT_EQ(i, wireStreamsFrame.maxStreams); EXPECT_TRUE(cursor.isAtEnd()); } @@ -1268,12 +1270,13 @@ TEST_F(QuicWriteCodecTest, WriteStreamIdNeeded) { auto builtOut = std::move(pktBuilder).buildPacket(); auto regularPacket = builtOut.first; EXPECT_EQ(bytesWritten, 3); - EXPECT_NO_THROW(boost::get(regularPacket.frames[0])); + EXPECT_NO_THROW(boost::get( + boost::get(regularPacket.frames[0]))); auto wireBuf = std::move(builtOut.second); folly::io::Cursor cursor(wireBuf.get()); - auto writeStreamIdBlocked = - boost::get(parseQuicFrame(cursor)); + auto writeStreamIdBlocked = boost::get( + boost::get(parseQuicFrame(cursor))); EXPECT_EQ(writeStreamIdBlocked.maxStreams, blockedStreamId); EXPECT_TRUE(cursor.isAtEnd()); } diff --git a/quic/flowcontrol/test/QuicFlowControlTest.cpp b/quic/flowcontrol/test/QuicFlowControlTest.cpp index 9f2c2d200..0068d6c04 100644 --- a/quic/flowcontrol/test/QuicFlowControlTest.cpp +++ b/quic/flowcontrol/test/QuicFlowControlTest.cpp @@ -24,8 +24,8 @@ class QuicFlowControlTest : public Test { public: void SetUp() override { transportInfoCb_ = std::make_unique(); - conn_.streamManager = - std::make_unique(conn_, conn_.nodeType); + conn_.streamManager = std::make_unique( + conn_, conn_.nodeType, conn_.transportSettings); conn_.infoCallback = transportInfoCb_.get(); } std::unique_ptr transportInfoCb_; diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index d6ee6c731..0f8acd1a8 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -494,6 +494,7 @@ void onServerReadDataFromOpen( conn.transportSettings.maxRecvPacketSize, conn.transportSettings.partialReliabilityEnabled, token)); + conn.transportParametersEncoded = true; QuicFizzFactory fizzFactory; FizzCryptoFactory cryptoFactory(&fizzFactory); conn.readCodec = std::make_unique(QuicNodeType::Server); @@ -870,18 +871,6 @@ void onServerReadDataFromOpen( *stream, streamWindowUpdate.maximumData, packetNum); } }, - [&](MaxStreamsFrame& maxStreamsFrame) { - VLOG(10) << "Server received max streams frame stream=" - << maxStreamsFrame.maxStreams << " " << conn; - isNonProbingPacket = true; - if (maxStreamsFrame.isForBidirectionalStream()) { - conn.streamManager->setMaxLocalBidirectionalStreams( - maxStreamsFrame.maxStreams); - } else { - conn.streamManager->setMaxLocalUnidirectionalStreams( - maxStreamsFrame.maxStreams); - } - }, [&](DataBlockedFrame&) { VLOG(10) << "Server received blocked " << conn; pktHasRetransmittableData = true; diff --git a/quic/server/state/ServerStateMachine.h b/quic/server/state/ServerStateMachine.h index 929f44ca2..f0ec3bf54 100644 --- a/quic/server/state/ServerStateMachine.h +++ b/quic/server/state/ServerStateMachine.h @@ -132,7 +132,8 @@ struct QuicServerConnectionState : public QuicConnectionStateBase { pendingZeroRttData = std::make_unique>(); pendingOneRttData = std::make_unique>(); - streamManager = std::make_unique(*this, this->nodeType); + streamManager = std::make_unique( + *this, this->nodeType, transportSettings); } }; diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index f90ecf76d..3d8032fd1 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -109,8 +109,8 @@ void QuicStreamManager::setMaxLocalBidirectionalStreams( "Attempt to set maxStreams beyond the max allowed.", TransportErrorCode::STREAM_LIMIT_ERROR); } - StreamId maxStreamId = - maxStreams * detail::kStreamIncrement + initialBidirectionalStreamId_; + StreamId maxStreamId = maxStreams * detail::kStreamIncrement + + initialLocalBidirectionalStreamId_; if (force || maxStreamId > maxLocalBidirectionalStreamId_) { maxLocalBidirectionalStreamId_ = maxStreamId; } @@ -124,21 +124,68 @@ void QuicStreamManager::setMaxLocalUnidirectionalStreams( "Attempt to set maxStreams beyond the max allowed.", TransportErrorCode::STREAM_LIMIT_ERROR); } - StreamId maxStreamId = - maxStreams * detail::kStreamIncrement + initialUnidirectionalStreamId_; + StreamId maxStreamId = maxStreams * detail::kStreamIncrement + + initialLocalUnidirectionalStreamId_; if (force || maxStreamId > maxLocalUnidirectionalStreamId_) { maxLocalUnidirectionalStreamId_ = maxStreamId; } } +void QuicStreamManager::setMaxRemoteBidirectionalStreams(uint64_t maxStreams) { + setMaxRemoteBidirectionalStreamsInternal(maxStreams, false); +} + +void QuicStreamManager::setMaxRemoteUnidirectionalStreams(uint64_t maxStreams) { + setMaxRemoteUnidirectionalStreamsInternal(maxStreams, false); +} + +void QuicStreamManager::setMaxRemoteBidirectionalStreamsInternal( + uint64_t maxStreams, + bool force) { + if (maxStreams > kMaxMaxStreams) { + throw QuicTransportException( + "Attempt to set maxStreams beyond the max allowed.", + TransportErrorCode::STREAM_LIMIT_ERROR); + } + StreamId maxStreamId = maxStreams * detail::kStreamIncrement + + initialRemoteBidirectionalStreamId_; + if (force || maxStreamId > maxRemoteBidirectionalStreamId_) { + maxRemoteBidirectionalStreamId_ = maxStreamId; + } +} + +void QuicStreamManager::setMaxRemoteUnidirectionalStreamsInternal( + uint64_t maxStreams, + bool force) { + if (maxStreams > kMaxMaxStreams) { + throw QuicTransportException( + "Attempt to set maxStreams beyond the max allowed.", + TransportErrorCode::STREAM_LIMIT_ERROR); + } + StreamId maxStreamId = maxStreams * detail::kStreamIncrement + + initialRemoteUnidirectionalStreamId_; + if (force || maxStreamId > maxRemoteUnidirectionalStreamId_) { + maxRemoteUnidirectionalStreamId_ = maxStreamId; + } +} + +void QuicStreamManager::refreshTransportSettings( + const TransportSettings& settings) { + transportSettings_ = &settings; + setMaxRemoteBidirectionalStreamsInternal( + transportSettings_->advertisedInitialMaxStreamsBidi, true); + setMaxRemoteUnidirectionalStreamsInternal( + transportSettings_->advertisedInitialMaxStreamsUni, true); +} + // We create local streams lazily. If a local stream was created // but not allocated yet, this will allocate a stream. // This will return nullptr if a stream is closed or un-opened. QuicStreamState* FOLLY_NULLABLE QuicStreamManager::getOrCreateOpenedLocalStream(StreamId streamId) { - auto streamIdx = - std::find(openLocalStreams_.begin(), openLocalStreams_.end(), streamId); - if (streamIdx != openLocalStreams_.end()) { + auto streamIdx = std::lower_bound( + openLocalStreams_.begin(), openLocalStreams_.end(), streamId); + if (streamIdx != openLocalStreams_.end() && *streamIdx == streamId) { // Open a lazily created stream. auto it = streams_.emplace( std::piecewise_construct, @@ -217,9 +264,12 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) { if (peerStream != streams_.end()) { return &peerStream->second; } - auto streamIdx = - std::find(openPeerStreams_.begin(), openPeerStreams_.end(), streamId); - if (streamIdx != openPeerStreams_.end()) { + auto& openPeerStreams = isUnidirectionalStream(streamId) + ? openUnidirectionalPeerStreams_ + : openBidirectionalPeerStreams_; + auto streamIdx = std::lower_bound( + openPeerStreams.begin(), openPeerStreams.end(), streamId); + if (streamIdx != openPeerStreams.end() && *streamIdx == streamId) { // Stream was already open, create the state for it lazily. auto it = streams_.emplace( std::piecewise_construct, @@ -229,7 +279,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) { return &it.first->second; } - auto previousPeerStreams = openPeerStreams_; + auto previousPeerStreams = folly::copy(openPeerStreams); auto& nextAcceptableStreamId = isUnidirectionalStream(streamId) ? nextAcceptablePeerUnidirectionalStreamId_ : nextAcceptablePeerBidirectionalStreamId_; @@ -237,7 +287,7 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) { ? maxRemoteUnidirectionalStreamId_ : maxRemoteBidirectionalStreamId_; auto openedResult = openStreamIfNotClosed( - streamId, openPeerStreams_, nextAcceptableStreamId, maxStreamId); + streamId, openPeerStreams, nextAcceptableStreamId, maxStreamId); if (openedResult == LocalErrorCode::CREATING_EXISTING_STREAM) { // Stream could be closed here. return nullptr; @@ -248,8 +298,8 @@ QuicStreamManager::getOrCreatePeerStream(StreamId streamId) { // Copy over the new streams. std::set_difference( - openPeerStreams_.begin(), - openPeerStreams_.end(), + openPeerStreams.begin(), + openPeerStreams.end(), std::make_move_iterator(previousPeerStreams.begin()), std::make_move_iterator(previousPeerStreams.end()), std::back_inserter(newPeerStreams_)); @@ -331,14 +381,49 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { } streams_.erase(it); QUIC_STATS(conn_.infoCallback, onQuicStreamClosed); - auto streamItr = - std::find(openPeerStreams_.begin(), openPeerStreams_.end(), streamId); - if (streamItr != openPeerStreams_.end()) { - openPeerStreams_.erase(streamItr); + if (isRemoteStream(nodeType_, streamId)) { + auto& openPeerStreams = isUnidirectionalStream(streamId) + ? openUnidirectionalPeerStreams_ + : openBidirectionalPeerStreams_; + auto streamItr = std::lower_bound( + openPeerStreams.begin(), openPeerStreams.end(), streamId); + if (streamItr != openPeerStreams.end() && *streamItr == streamId) { + openPeerStreams.erase(streamItr); + // Check if we should send a stream limit update. We need to send an + // update every time we've closed a number of streams >= the set windowing + // fraction. + uint64_t initialStreamLimit = isUnidirectionalStream(streamId) + ? transportSettings_->advertisedInitialMaxStreamsUni + : transportSettings_->advertisedInitialMaxStreamsBidi; + uint64_t streamWindow = + initialStreamLimit / streamLimitWindowingFraction_; + uint64_t openableRemoteStreams = isUnidirectionalStream(streamId) + ? openableRemoteUnidirectionalStreams() + : openableRemoteBidirectionalStreams(); + // The "credit" here is how much available stream space we have based on + // what the initial stream limit was set to. + uint64_t streamCredit = + initialStreamLimit - openableRemoteStreams - openPeerStreams.size(); + if (streamCredit >= streamWindow) { + if (isUnidirectionalStream(streamId)) { + uint64_t maxStreams = (maxRemoteUnidirectionalStreamId_ - + initialRemoteUnidirectionalStreamId_) / + detail::kStreamIncrement; + setMaxRemoteUnidirectionalStreams(maxStreams + streamCredit); + remoteUnidirectionalStreamLimitUpdate_ = maxStreams + streamCredit; + } else { + uint64_t maxStreams = (maxRemoteBidirectionalStreamId_ - + initialRemoteBidirectionalStreamId_) / + detail::kStreamIncrement; + setMaxRemoteBidirectionalStreams(maxStreams + streamCredit); + remoteBidirectionalStreamLimitUpdate_ = maxStreams + streamCredit; + } + } + } } else { - streamItr = - std::find(openLocalStreams_.begin(), openLocalStreams_.end(), streamId); - if (streamItr != openLocalStreams_.end()) { + auto streamItr = std::lower_bound( + openLocalStreams_.begin(), openLocalStreams_.end(), streamId); + if (streamItr != openLocalStreams_.end() && *streamItr == streamId) { openLocalStreams_.erase(streamItr); } } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 59820dd1e..dc3fb64c6 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -28,8 +29,11 @@ class QuicStreamManager { public: explicit QuicStreamManager( QuicConnectionStateBase& conn, - QuicNodeType nodeType) - : conn_(conn), nodeType_(nodeType) { + QuicNodeType nodeType, + const TransportSettings& transportSettings) + : conn_(conn), + nodeType_(nodeType), + transportSettings_(&transportSettings) { if (nodeType == QuicNodeType::Server) { nextAcceptablePeerBidirectionalStreamId_ = 0x00; nextAcceptablePeerUnidirectionalStreamId_ = 0x02; @@ -37,8 +41,10 @@ class QuicStreamManager { nextAcceptableLocalUnidirectionalStreamId_ = 0x03; nextBidirectionalStreamId_ = 0x01; nextUnidirectionalStreamId_ = 0x03; - initialBidirectionalStreamId_ = 0x01; - initialUnidirectionalStreamId_ = 0x03; + initialLocalBidirectionalStreamId_ = 0x01; + initialLocalUnidirectionalStreamId_ = 0x03; + initialRemoteBidirectionalStreamId_ = 0x00; + initialRemoteUnidirectionalStreamId_ = 0x02; } else { nextAcceptablePeerBidirectionalStreamId_ = 0x01; nextAcceptablePeerUnidirectionalStreamId_ = 0x03; @@ -46,9 +52,12 @@ class QuicStreamManager { nextAcceptableLocalUnidirectionalStreamId_ = 0x02; nextBidirectionalStreamId_ = 0x00; nextUnidirectionalStreamId_ = 0x02; - initialBidirectionalStreamId_ = 0x00; - initialUnidirectionalStreamId_ = 0x02; + initialLocalBidirectionalStreamId_ = 0x00; + initialLocalUnidirectionalStreamId_ = 0x02; + initialRemoteBidirectionalStreamId_ = 0x01; + initialRemoteUnidirectionalStreamId_ = 0x03; } + refreshTransportSettings(transportSettings); } /* * Create the state for a stream if it does not exist and return it. Note this @@ -117,7 +126,13 @@ class QuicStreamManager { */ bool streamExists(StreamId streamId) { return std::binary_search( - openPeerStreams_.begin(), openPeerStreams_.end(), streamId) || + openBidirectionalPeerStreams_.begin(), + openBidirectionalPeerStreams_.end(), + streamId) || + std::binary_search( + openUnidirectionalPeerStreams_.begin(), + openUnidirectionalPeerStreams_.end(), + streamId) || std::binary_search( openLocalStreams_.begin(), openLocalStreams_.end(), streamId); } @@ -158,7 +173,8 @@ class QuicStreamManager { */ void clearOpenStreams() { openLocalStreams_.clear(); - openPeerStreams_.clear(); + openBidirectionalPeerStreams_.clear(); + openUnidirectionalPeerStreams_.clear(); streams_.clear(); } @@ -277,6 +293,54 @@ class QuicStreamManager { uint64_t maxStreams, bool force = false); + /* + * Set the max number of remote bidirectional streams. Can only be increased + * unless force is true. + */ + void setMaxRemoteBidirectionalStreams(uint64_t maxStreams); + + /* + * Set the max number of remote unidirectional streams. Can only be increased + * unless force is true. + */ + void setMaxRemoteUnidirectionalStreams(uint64_t maxStreams); + + void refreshTransportSettings(const TransportSettings& settings); + + /* + * Sets the "window-by" fraction for sending stream limit updates. E.g. + * setting the fraction to two when the initial stream limit was 100 will + * cause the stream manager to update the relevant stream limit update when + * 50 streams have been closed. + */ + void setStreamLimitWindowingFraction(uint64_t fraction) { + if (fraction > 0) { + streamLimitWindowingFraction_ = fraction; + } + } + + /* + * The next value that should be sent in a bidirectional max streams frame, + * if any. This is potentially updated every time a bidirectional stream is + * closed. Calling this function "consumes" the update. + */ + folly::Optional remoteBidirectionalStreamLimitUpdate() { + auto ret = remoteBidirectionalStreamLimitUpdate_; + remoteBidirectionalStreamLimitUpdate_ = folly::none; + return ret; + } + + /* + * The next value that should be sent in a unidirectional max streams frame, + * if any. This is potentially updated every time a unidirectional stream is + * closed. Calling this function "consumes" the update. + */ + folly::Optional remoteUnidirectionalStreamLimitUpdate() { + auto ret = remoteUnidirectionalStreamLimitUpdate_; + remoteUnidirectionalStreamLimitUpdate_ = folly::none; + return ret; + } + /* * Returns a const reference to the underlying stream window updates * container. @@ -488,10 +552,20 @@ class QuicStreamManager { // TODO figure out a better interface here. /* - * Returns a mutable reference to the underlying open peer streams container. + * Returns a mutable reference to the underlying open bidirectional peer + * streams container. */ - auto& openPeerStreams() { - return openPeerStreams_; + auto& openBidirectionalPeerStreams() { + return openBidirectionalPeerStreams_; + } + + // TODO figure out a better interface here. + /* + * Returns a mutable reference to the underlying open peer unidirectional + * streams container. + */ + auto& openUnidirectionalPeerStreams() { + return openUnidirectionalPeerStreams_; } // TODO figure out a better interface here. @@ -579,6 +653,13 @@ class QuicStreamManager { QuicStreamState* FOLLY_NULLABLE getOrCreatePeerStream(StreamId streamId); + void setMaxRemoteBidirectionalStreamsInternal( + uint64_t maxStreams, + bool force); + void setMaxRemoteUnidirectionalStreamsInternal( + uint64_t maxStreams, + bool force); + QuicConnectionStateBase& conn_; QuicNodeType nodeType_; @@ -608,18 +689,39 @@ class QuicStreamManager { StreamId maxLocalUnidirectionalStreamId_{0}; - StreamId maxRemoteBidirectionalStreamId_{kMaxStreamId}; + StreamId maxRemoteBidirectionalStreamId_{0}; - StreamId maxRemoteUnidirectionalStreamId_{kMaxStreamId}; + StreamId maxRemoteUnidirectionalStreamId_{0}; - StreamId initialBidirectionalStreamId_{0}; + StreamId initialLocalBidirectionalStreamId_{0}; - StreamId initialUnidirectionalStreamId_{0}; + StreamId initialLocalUnidirectionalStreamId_{0}; + + StreamId initialRemoteBidirectionalStreamId_{0}; + + StreamId initialRemoteUnidirectionalStreamId_{0}; + + // The fraction to determine the window by which we will signal the need to + // send stream limit updates + uint64_t streamLimitWindowingFraction_{2}; + + // Contains the value of a stream window update that should be sent for + // remote bidirectional streams. + folly::Optional remoteBidirectionalStreamLimitUpdate_; + + // Contains the value of a stream window update that should be sent for + // remote bidirectional streams. + folly::Optional remoteUnidirectionalStreamLimitUpdate_; uint64_t numControlStreams_{0}; - // Streams that are opened by the peer on the connection. Ordered by id. - std::deque openPeerStreams_; + // Bidirectional streams that are opened by the peer on the connection. + // Ordered by id. + std::deque openBidirectionalPeerStreams_; + + // Unidirectional streams that are opened by the peer on the connection. + // Ordered by id. + std::deque openUnidirectionalPeerStreams_; // Streams that are opened locally on the connection. Ordered by id. std::deque openLocalStreams_; @@ -668,6 +770,8 @@ class QuicStreamManager { // Record whether or not we are app-idle. bool isAppIdle_{false}; + + const TransportSettings* transportSettings_; }; } // namespace quic diff --git a/quic/state/SimpleFrameFunctions.cpp b/quic/state/SimpleFrameFunctions.cpp index 674db35f7..3296292e4 100644 --- a/quic/state/SimpleFrameFunctions.cpp +++ b/quic/state/SimpleFrameFunctions.cpp @@ -63,6 +63,9 @@ folly::Optional updateSimpleFrameOnPacketClone( [&](const NewConnectionIdFrame& frame) -> folly::Optional { return QuicSimpleFrame(frame); + }, + [&](const MaxStreamsFrame& frame) -> folly::Optional { + return QuicSimpleFrame(frame); }); } @@ -124,6 +127,9 @@ void updateSimpleFrameOnPacketLoss( }, [&](const NewConnectionIdFrame& frame) { conn.pendingEvents.frames.push_back(frame); + }, + [&](const MaxStreamsFrame& frame) { + conn.pendingEvents.frames.push_back(frame); }); } @@ -176,6 +182,16 @@ bool updateSimpleFrameOnPacketReceived( [&](const NewConnectionIdFrame&) { // TODO junqiw return false; + }, + [&](const MaxStreamsFrame& maxStreamsFrame) { + if (maxStreamsFrame.isForBidirectionalStream()) { + conn.streamManager->setMaxLocalBidirectionalStreams( + maxStreamsFrame.maxStreams); + } else { + conn.streamManager->setMaxLocalUnidirectionalStreams( + maxStreamsFrame.maxStreams); + } + return true; }); } diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 722266822..ddf8f6479 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -594,6 +594,9 @@ struct QuicConnectionStateBase { // Settings for transports. TransportSettings transportSettings; + // Whether we've set the transporot parameters from transportSettings yet. + bool transportParametersEncoded{false}; + // Value of the negotiated ack delay exponent. uint64_t peerAckDelayExponent{kDefaultAckDelayExponent}; diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index 36d40c465..6d6185741 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -876,19 +876,19 @@ TEST_F(QuicServerStreamFunctionsTest, GetOrCreateClientOutOfOrderStream) { EXPECT_TRUE(conn.streamManager->streamExists(outOfOrderStream)); // peer stream starts from 0x00 EXPECT_EQ( - conn.streamManager->openPeerStreams().size(), + conn.streamManager->openBidirectionalPeerStreams().size(), ((outOfOrderStream) / kStreamIncrement) + 1); conn.streamManager->getStream(existingStream); EXPECT_EQ(conn.streamManager->streamCount(), 2); EXPECT_TRUE(conn.streamManager->streamExists(outOfOrderStream)); EXPECT_EQ( - conn.streamManager->openPeerStreams().size(), + conn.streamManager->openBidirectionalPeerStreams().size(), ((outOfOrderStream) / kStreamIncrement) + 1); - conn.streamManager->openPeerStreams().erase(std::find( - conn.streamManager->openPeerStreams().begin(), - conn.streamManager->openPeerStreams().end(), + conn.streamManager->openBidirectionalPeerStreams().erase(std::find( + conn.streamManager->openBidirectionalPeerStreams().begin(), + conn.streamManager->openBidirectionalPeerStreams().end(), closedStream)); EXPECT_EQ(conn.streamManager->getStream(closedStream), nullptr); } @@ -916,9 +916,9 @@ TEST_F(QuicServerStreamFunctionsTest, GetOrCreateClosedClientStream) { StreamId outOfOrderStream1 = 100; StreamId closedStream = 48; conn.streamManager->getStream(outOfOrderStream1); - conn.streamManager->openPeerStreams().erase(std::find( - conn.streamManager->openPeerStreams().begin(), - conn.streamManager->openPeerStreams().end(), + conn.streamManager->openBidirectionalPeerStreams().erase(std::find( + conn.streamManager->openBidirectionalPeerStreams().begin(), + conn.streamManager->openBidirectionalPeerStreams().end(), closedStream)); EXPECT_EQ(conn.streamManager->getStream(closedStream), nullptr); } @@ -929,13 +929,13 @@ TEST_F( StreamId outOfOrderStream1 = 96; StreamId outOfOrderStream2 = 100; conn.streamManager->getStream(outOfOrderStream1); - conn.streamManager->openPeerStreams().erase(std::find( - conn.streamManager->openPeerStreams().begin(), - conn.streamManager->openPeerStreams().end(), + conn.streamManager->openBidirectionalPeerStreams().erase(std::find( + conn.streamManager->openBidirectionalPeerStreams().begin(), + conn.streamManager->openBidirectionalPeerStreams().end(), outOfOrderStream1)); conn.streamManager->getStream(outOfOrderStream2); EXPECT_EQ( - conn.streamManager->openPeerStreams().size(), + conn.streamManager->openBidirectionalPeerStreams().size(), (outOfOrderStream2) / kStreamIncrement); } @@ -943,13 +943,13 @@ TEST_F(QuicStreamFunctionsTest, GetOrCreateServerStreamAfterClosingLastStream) { StreamId outOfOrderStream1 = 97; StreamId outOfOrderStream2 = 101; conn.streamManager->getStream(outOfOrderStream1); - conn.streamManager->openPeerStreams().erase(std::find( - conn.streamManager->openPeerStreams().begin(), - conn.streamManager->openPeerStreams().end(), + conn.streamManager->openBidirectionalPeerStreams().erase(std::find( + conn.streamManager->openBidirectionalPeerStreams().begin(), + conn.streamManager->openBidirectionalPeerStreams().end(), outOfOrderStream1)); conn.streamManager->getStream(outOfOrderStream2); EXPECT_EQ( - conn.streamManager->openPeerStreams().size(), + conn.streamManager->openBidirectionalPeerStreams().size(), (outOfOrderStream2 + 1) / kStreamIncrement); } @@ -957,9 +957,9 @@ TEST_F(QuicStreamFunctionsTest, GetOrCreateClosedServerStream) { StreamId outOfOrderStream1 = 97; StreamId closedStream = 49; conn.streamManager->getStream(outOfOrderStream1); - conn.streamManager->openPeerStreams().erase(std::find( - conn.streamManager->openPeerStreams().begin(), - conn.streamManager->openPeerStreams().end(), + conn.streamManager->openBidirectionalPeerStreams().erase(std::find( + conn.streamManager->openBidirectionalPeerStreams().begin(), + conn.streamManager->openBidirectionalPeerStreams().end(), closedStream)); EXPECT_EQ(conn.streamManager->getStream(closedStream), nullptr); } @@ -1537,13 +1537,13 @@ TEST_F(QuicServerStreamFunctionsTest, ServerGetClientQuicStream) { std::deque newStreams = {0x0, 0x4, 0x8, 0xc, 0x10}; EXPECT_EQ(conn.streamManager->getStream(clientStream)->id, clientStream); EXPECT_EQ(conn.streamManager->streamCount(), 1); - EXPECT_EQ(conn.streamManager->openPeerStreams().size(), 5); + EXPECT_EQ(conn.streamManager->openBidirectionalPeerStreams().size(), 5); EXPECT_EQ(conn.streamManager->newPeerStreams(), newStreams); StreamId clientStream2 = 0x4; EXPECT_EQ(conn.streamManager->getStream(clientStream2)->id, clientStream2); EXPECT_EQ(conn.streamManager->streamCount(), 2); - EXPECT_EQ(conn.streamManager->openPeerStreams().size(), 5); + EXPECT_EQ(conn.streamManager->openBidirectionalPeerStreams().size(), 5); EXPECT_EQ(conn.streamManager->newPeerStreams().size(), 5); EXPECT_EQ(conn.streamManager->newPeerStreams(), newStreams); @@ -1551,7 +1551,7 @@ TEST_F(QuicServerStreamFunctionsTest, ServerGetClientQuicStream) { newStreams = {0x0, 0x2, 0x4, 0x6, 0x8, 0xc, 0x10}; EXPECT_EQ(conn.streamManager->getStream(clientStream3)->id, clientStream3); EXPECT_EQ(conn.streamManager->streamCount(), 3); - EXPECT_EQ(conn.streamManager->openPeerStreams().size(), 7); + EXPECT_EQ(conn.streamManager->openUnidirectionalPeerStreams().size(), 2); std::sort( conn.streamManager->newPeerStreams().begin(), conn.streamManager->newPeerStreams().end()); @@ -1636,12 +1636,12 @@ TEST_F(QuicStreamFunctionsTest, ClientGetServerQuicStream) { StreamId serverStream = 0x09; EXPECT_EQ(conn.streamManager->getStream(serverStream)->id, serverStream); EXPECT_EQ(conn.streamManager->streamCount(), 1); - EXPECT_EQ(conn.streamManager->openPeerStreams().size(), 3); + EXPECT_EQ(conn.streamManager->openBidirectionalPeerStreams().size(), 3); StreamId serverStream2 = 0x05; EXPECT_EQ(conn.streamManager->getStream(serverStream2)->id, serverStream2); EXPECT_EQ(conn.streamManager->streamCount(), 2); - EXPECT_EQ(conn.streamManager->openPeerStreams().size(), 3); + EXPECT_EQ(conn.streamManager->openBidirectionalPeerStreams().size(), 3); } TEST_F(QuicStreamFunctionsTest, ClientGetClientQuicStream) { @@ -1692,10 +1692,10 @@ TEST_F(QuicStreamFunctionsTest, StreamExists) { EXPECT_TRUE(conn.streamManager->streamExists(peerAutoOpened)); auto it = std::find( - conn.streamManager->openPeerStreams().begin(), - conn.streamManager->openPeerStreams().end(), + conn.streamManager->openBidirectionalPeerStreams().begin(), + conn.streamManager->openBidirectionalPeerStreams().end(), peerAutoOpened); - conn.streamManager->openPeerStreams().erase(it); + conn.streamManager->openBidirectionalPeerStreams().erase(it); conn.streamManager->removeClosedStream(peerStream); @@ -1704,6 +1704,34 @@ TEST_F(QuicStreamFunctionsTest, StreamExists) { EXPECT_TRUE(conn.streamManager->streamExists(peerAutoOpened2)); } +TEST_F(QuicStreamFunctionsTest, StreamLimitUpdates) { + StreamId peerStream = 13; + StreamId peerAutoOpened = 5; + StreamId peerAutoOpened2 = 9; + StreamId notOpenedPeer = 17; + + conn.streamManager->setStreamLimitWindowingFraction( + conn.transportSettings.advertisedInitialMaxStreamsBidi); + conn.streamManager->getStream(peerStream)->send.state = + StreamSendStates::Closed{}; + conn.streamManager->getStream(peerStream)->recv.state = + StreamReceiveStates::Closed{}; + EXPECT_FALSE(conn.streamManager->streamExists(notOpenedPeer)); + EXPECT_TRUE(conn.streamManager->streamExists(peerStream)); + EXPECT_TRUE(conn.streamManager->streamExists(peerAutoOpened)); + + conn.streamManager->removeClosedStream(peerStream); + + EXPECT_FALSE(conn.streamManager->streamExists(peerStream)); + EXPECT_TRUE(conn.streamManager->streamExists(peerAutoOpened)); + EXPECT_TRUE(conn.streamManager->streamExists(peerAutoOpened2)); + auto update = conn.streamManager->remoteBidirectionalStreamLimitUpdate(); + ASSERT_TRUE(update); + EXPECT_EQ( + update.value(), + conn.transportSettings.advertisedInitialMaxStreamsBidi + 1); +} + TEST_F(QuicStreamFunctionsTest, AllBytesTillFinAcked) { StreamId id = 3; QuicStreamState stream(id, conn); diff --git a/quic/state/test/QuicStreamManagerTest.cpp b/quic/state/test/QuicStreamManagerTest.cpp index 71920b560..8c8ec6347 100644 --- a/quic/state/test/QuicStreamManagerTest.cpp +++ b/quic/state/test/QuicStreamManagerTest.cpp @@ -178,5 +178,75 @@ TEST_F(QuicStreamManagerTest, TestAppIdleCloseControlStream) { manager.removeClosedStream(stream->id); EXPECT_TRUE(manager.isAppIdle()); } + +TEST_F(QuicStreamManagerTest, StreamLimitWindowedUpdate) { + auto& manager = *conn.streamManager; + conn.transportSettings.advertisedInitialMaxStreamsBidi = 100; + conn.transportSettings.advertisedInitialMaxStreamsUni = 100; + manager.refreshTransportSettings(conn.transportSettings); + manager.setStreamLimitWindowingFraction(4); + for (int i = 0; i < 100; i++) { + manager.getStream(i * detail::kStreamIncrement); + manager.getStream(2 + i * detail::kStreamIncrement); + } + for (int i = 0; i < 25; i++) { + auto stream = manager.getStream(i * detail::kStreamIncrement); + stream->send.state = StreamSendStates::Closed(); + stream->recv.state = StreamReceiveStates::Closed(); + manager.removeClosedStream(stream->id); + stream = manager.getStream(2 + i * detail::kStreamIncrement); + stream->send.state = StreamSendStates::Closed(); + stream->recv.state = StreamReceiveStates::Closed(); + manager.removeClosedStream(stream->id); + } + auto update = manager.remoteBidirectionalStreamLimitUpdate(); + ASSERT_TRUE(update); + EXPECT_EQ(update.value(), 125); + EXPECT_FALSE(manager.remoteBidirectionalStreamLimitUpdate()); + + update = manager.remoteUnidirectionalStreamLimitUpdate(); + ASSERT_TRUE(update); + EXPECT_EQ(update.value(), 125); + EXPECT_FALSE(manager.remoteUnidirectionalStreamLimitUpdate()); +} + +TEST_F(QuicStreamManagerTest, StreamLimitNoWindowedUpdate) { + auto& manager = *conn.streamManager; + conn.transportSettings.advertisedInitialMaxStreamsBidi = 100; + manager.refreshTransportSettings(conn.transportSettings); + manager.setStreamLimitWindowingFraction(4); + for (int i = 0; i < 100; i++) { + manager.getStream(i * detail::kStreamIncrement); + } + for (int i = 0; i < 24; i++) { + auto stream = manager.getStream(i * detail::kStreamIncrement); + stream->send.state = StreamSendStates::Closed(); + stream->recv.state = StreamReceiveStates::Closed(); + manager.removeClosedStream(stream->id); + } + auto update = manager.remoteBidirectionalStreamLimitUpdate(); + EXPECT_FALSE(update); +} + +TEST_F(QuicStreamManagerTest, StreamLimitManyWindowedUpdate) { + auto& manager = *conn.streamManager; + conn.transportSettings.advertisedInitialMaxStreamsBidi = 100; + manager.refreshTransportSettings(conn.transportSettings); + manager.setStreamLimitWindowingFraction(4); + for (int i = 0; i < 100; i++) { + manager.getStream(i * detail::kStreamIncrement); + } + for (int i = 0; i < 50; i++) { + auto stream = manager.getStream(i * detail::kStreamIncrement); + stream->send.state = StreamSendStates::Closed(); + stream->recv.state = StreamReceiveStates::Closed(); + manager.removeClosedStream(stream->id); + } + auto update = manager.remoteBidirectionalStreamLimitUpdate(); + ASSERT_TRUE(update); + EXPECT_EQ(update.value(), 150); + EXPECT_FALSE(manager.remoteBidirectionalStreamLimitUpdate()); + EXPECT_FALSE(manager.remoteUnidirectionalStreamLimitUpdate()); +} } // namespace test } // namespace quic