diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index a658e0821..596129ad5 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -2818,4 +2818,58 @@ TEST_F(QuicPacketSchedulerTest, RstStreamSchedulerReliableReset) { EXPECT_FALSE(conn.pendingEvents.resets.contains(stream->id)); } +TEST_F(QuicPacketSchedulerTest, PausedPriorityInitial) { + static const auto kSequentialPriority = Priority(3, false); + static const auto kPausedPriority = Priority(0, false, 0, true /* paused */); + QuicServerConnectionState conn( + FizzServerQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + auto pausedStreamId = + (*conn.streamManager->createNextBidirectionalStream())->id; + auto regularStreamId = + (*conn.streamManager->createNextBidirectionalStream())->id; + auto pausedStream = conn.streamManager->findStream(pausedStreamId); + auto regularStream = conn.streamManager->findStream(regularStreamId); + pausedStream->priority = kPausedPriority; + regularStream->priority = kSequentialPriority; + + writeDataToQuicStream( + *conn.streamManager->findStream(pausedStream->id), + folly::IOBuf::copyBuffer("paused_data"), + false); + writeDataToQuicStream( + *conn.streamManager->findStream(regularStream->id), + folly::IOBuf::copyBuffer("regular_data"), + false); + + // Should write frames for only regular stream. + StreamFrameScheduler scheduler(conn); + NiceMock mockBuilder; + EXPECT_CALL(mockBuilder, remainingSpaceInPkt()).WillRepeatedly(Return(4096)); + EXPECT_CALL(mockBuilder, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) { + mockBuilder.frames_.push_back(f); + })); + scheduler.writeStreams(mockBuilder); + auto& frames = mockBuilder.frames_; + ASSERT_EQ(frames.size(), 1); + WriteStreamFrame regularFrame(regularStream->id, 0, 12, false); + ASSERT_TRUE(frames[0].asWriteStreamFrame()); + EXPECT_EQ(*frames[0].asWriteStreamFrame(), regularFrame); + conn.streamManager->removeWritable(*regularStream); + + // Unpause the stream. Expect the scheduleor to write the data. + conn.streamManager->setStreamPriority(pausedStreamId, kSequentialPriority); + scheduler.writeStreams(mockBuilder); + ASSERT_EQ(frames.size(), 2); + WriteStreamFrame pausedFrame(pausedStream->id, 0, 11, false); + ASSERT_TRUE(frames[1].asWriteStreamFrame()); + EXPECT_EQ(*frames[1].asWriteStreamFrame(), pausedFrame); + + // Pause the stream again. Expect no more data writable. + conn.streamManager->setStreamPriority(pausedStreamId, kPausedPriority); + ASSERT_FALSE(conn.streamManager->hasWritable()); +} + } // namespace quic::test diff --git a/quic/state/QuicPriorityQueue.h b/quic/state/QuicPriorityQueue.h index 2aaf18ca8..e17a66ebd 100644 --- a/quic/state/QuicPriorityQueue.h +++ b/quic/state/QuicPriorityQueue.h @@ -41,14 +41,15 @@ using OrderedStreamSet = std::set; struct Priority { uint8_t level : 3; bool incremental : 1; - OrderId orderId : 58; + OrderId orderId : 57; + bool paused : 1; - Priority(uint8_t l, bool i, OrderId o = 0) - : level(l), incremental(i), orderId(o) {} + Priority(uint8_t l, bool i, OrderId o = 0, bool p = false) + : level(l), incremental(i), orderId(o), paused(p) {} bool operator==(Priority other) const noexcept { return level == other.level && incremental == other.incremental && - orderId == other.orderId; + orderId == other.orderId && paused == other.paused; } }; @@ -212,11 +213,13 @@ struct PriorityQueue { void updateIfExist(StreamId id, Priority priority) { auto iter = writableStreamsToLevel_.find(id); if (iter != writableStreamsToLevel_.end()) { + CHECK(!priority.paused); updateExistingStreamPriority(iter, priority); } } void insertOrUpdate(StreamId id, Priority pri) { + CHECK(!pri.paused); auto it = writableStreamsToLevel_.find(id); auto index = priority2index(pri); if (it != writableStreamsToLevel_.end()) { diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index 8432bdeb5..597d40c4d 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -239,6 +239,7 @@ bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) { } notifyStreamPriorityChanges(); } + updateWritableStreams(*stream); writeQueue_.updateIfExist(id, stream->priority); return true; } @@ -669,6 +670,10 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { removeWritable(stream); return; } + if (stream.priority.paused) { + removeWritable(stream); + return; + } if (stream.hasWritableData()) { writableStreams_.emplace(stream.id); } else {