diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index c9f082848..e7a7d6705 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -171,7 +171,9 @@ BETTER_ENUM( // Controls default stream priority DEFAULT_STREAM_PRIORITY = 0x10003, // Controls write loop time fraction in terms of srtt - WRITE_LOOP_TIME_FRACTION = 0x10004) + WRITE_LOOP_TIME_FRACTION = 0x10004, + // Controls number of times a stream gets a write in incremental mode + WRITES_PER_STREAM = 0x10005) enum class FrameType : uint64_t { PADDING = 0x00, diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index ba5e05f34..5617b3469 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -468,7 +468,12 @@ void StreamFrameScheduler::writeStreamsHelper( if (!writeSingleStream(builder, *stream, connWritableBytes)) { break; } - level.iterator->next(); + auto remainingSpaceAfter = builder.remainingSpaceInPkt(); + // If we wrote a stream frame and there's still space in the packet, + // that implies we ran out of data or flow control on the stream and + // we should bypass the nextsPerStream in the priority queue. + bool forceNext = remainingSpaceAfter > 0; + level.iterator->next(forceNext); if (streamPerPacket) { return; } diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 65bdcbddd..e4cd8a823 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1288,6 +1288,86 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) { EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); } +TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinNextsPer) { + QuicClientConnectionState conn( + FizzClientQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.streamManager->writeQueue().setMaxNextsPerStream(2); + conn.transportSettings.defaultPriority = Priority(0, true); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + auto connId = getTestConnectionId(); + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + connId, + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder.encodePacketHeader(); + auto stream1 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream2 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream3 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096); + auto curBuf = largeBuf.get(); + do { + curBuf->append(curBuf->capacity()); + curBuf = curBuf->next(); + } while (curBuf != largeBuf.get()); + writeDataToQuicStream( + *conn.streamManager->findStream(stream1), std::move(largeBuf), false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream2), + folly::IOBuf::copyBuffer("some data"), + false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream3), + folly::IOBuf::copyBuffer("some data"), + false); + // Should write frames for stream1, stream1, stream2, stream3, followed by + // stream1 again. + NiceMock builder2; + std::vector remainingRet = {1500, 0, 1400, 0, 1300, 1100, 1000, 0}; + auto remainingItr = remainingRet.begin(); + + EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) { + builder2.frames_.push_back(f); + remainingItr++; + })); + EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Invoke([&]() { + return *remainingItr; + })); + scheduler.writeStreams(builder2); + remainingItr++; + ASSERT_EQ(conn.streamManager->writeQueue().getNextScheduledStream(), stream1); + ASSERT_EQ(builder2.frames_.size(), 1); + ASSERT_EQ(*remainingItr, 1400); + scheduler.writeStreams(builder2); + ASSERT_EQ(builder2.frames_.size(), 2); + ASSERT_EQ(conn.streamManager->writeQueue().getNextScheduledStream(), stream2); + ASSERT_EQ(*remainingItr, 0); + remainingItr++; + scheduler.writeStreams(builder2); + scheduler.writeStreams(builder2); + auto& frames = builder2.frames_; + ASSERT_EQ(frames.size(), 5); + ASSERT_TRUE(frames[0].asWriteStreamFrame()); + EXPECT_EQ(frames[0].asWriteStreamFrame()->streamId, stream1); + ASSERT_TRUE(frames[1].asWriteStreamFrame()); + EXPECT_EQ(frames[1].asWriteStreamFrame()->streamId, stream1); + ASSERT_TRUE(frames[2].asWriteStreamFrame()); + EXPECT_EQ(frames[2].asWriteStreamFrame()->streamId, stream2); + ASSERT_TRUE(frames[3].asWriteStreamFrame()); + EXPECT_EQ(frames[3].asWriteStreamFrame()->streamId, stream3); + ASSERT_TRUE(frames[4].asWriteStreamFrame()); + EXPECT_EQ(frames[4].asWriteStreamFrame()->streamId, stream1); +} + TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { QuicClientConnectionState conn( FizzClientQuicHandshakeContext::Builder().build()); @@ -2075,7 +2155,7 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) { auto packet1 = std::move(builder1).buildPacket().packet; updateConnection( conn, folly::none, packet1, Clock::now(), 1200, 0, false /* isDSR */); - EXPECT_EQ(2, packet1.frames.size()); + ASSERT_EQ(2, packet1.frames.size()); auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); EXPECT_EQ(streamId1, writeStreamFrame1.streamId); EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); diff --git a/quic/codec/QuicWriteCodec.cpp b/quic/codec/QuicWriteCodec.cpp index c6e7d5fb0..9941e651d 100644 --- a/quic/codec/QuicWriteCodec.cpp +++ b/quic/codec/QuicWriteCodec.cpp @@ -161,7 +161,6 @@ folly::Optional writeStreamFrameHeader( streamType.hasFin(), false /* fromBufMetaIn */, streamGroupId)); - DCHECK(dataLen <= builder.remainingSpaceInPkt()); return folly::make_optional(dataLen); } diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 9c6d061bb..1dd6ec541 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -1078,6 +1078,16 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() { serverConn->transportSettings.writeLimitRttFraction = val; VLOG(3) << "WRITE_LOOP_TIME_FRACTION KnobParam received: " << val; }); + registerTransportKnobParamHandler( + static_cast(TransportKnobParamId::WRITES_PER_STREAM), + [](QuicServerTransport* serverTransport, TransportKnobParam::Val value) { + CHECK(serverTransport); + auto val = std::get(value); + auto serverConn = serverTransport->serverConn_; + serverConn->transportSettings.priorityQueueWritesPerStream = val; + serverConn->streamManager->writeQueue().setMaxNextsPerStream(val); + VLOG(3) << "WRITES_PER_STREAM KnobParam received: " << val; + }); } QuicConnectionStats QuicServerTransport::getConnectionsStats() const { diff --git a/quic/server/test/QuicServerTransportTest.cpp b/quic/server/test/QuicServerTransportTest.cpp index a8644f556..023a190d1 100644 --- a/quic/server/test/QuicServerTransportTest.cpp +++ b/quic/server/test/QuicServerTransportTest.cpp @@ -4841,6 +4841,10 @@ TEST_F(QuicServerTransportTest, TestAckFrequencyPolicyKnobHandler) { {{static_cast(TransportKnobParamId::WRITE_LOOP_TIME_FRACTION), uint64_t(2)}}); EXPECT_EQ(server->getTransportSettings().writeLimitRttFraction, 2); + server->handleKnobParams( + {{static_cast(TransportKnobParamId::WRITES_PER_STREAM), + uint64_t(5)}}); + EXPECT_EQ(server->getTransportSettings().priorityQueueWritesPerStream, 5); } TEST_F(QuicServerTransportTest, TestSetMaxPacingRateLifecycle) { diff --git a/quic/state/QuicPriorityQueue.h b/quic/state/QuicPriorityQueue.h index d910f0707..e288a8d2d 100644 --- a/quic/state/QuicPriorityQueue.h +++ b/quic/state/QuicPriorityQueue.h @@ -69,18 +69,22 @@ struct PriorityQueue { const Level& level; public: - explicit Iterator(const Level& inLevel) - : level(inLevel), nextStreamIt(level.streams.end()) {} + explicit Iterator(const Level& inLevel, uint64_t maxNexts) + : level(inLevel), + maxNextsPerStream(maxNexts), + nextStreamIt(level.streams.end()) {} virtual ~Iterator() = default; virtual void begin() const = 0; virtual bool end() const = 0; virtual StreamId current() const { return nextStreamIt->streamId; } - virtual void next() = 0; + virtual void next(bool force = false) = 0; virtual void override(OrderedStreamSet::const_iterator it) { nextStreamIt = it; } + mutable uint64_t nextsSoFar{0}; + uint64_t maxNextsPerStream{1}; mutable OrderedStreamSet::const_iterator nextStreamIt; }; @@ -89,8 +93,8 @@ struct PriorityQueue { mutable OrderedStreamSet::const_iterator startStreamIt; public: - explicit IncrementalIterator(const Level& inLevel) - : Iterator(inLevel), startStreamIt(level.streams.end()) {} + explicit IncrementalIterator(const Level& inLevel, uint64_t maxNexts) + : Iterator(inLevel, maxNexts), startStreamIt(level.streams.end()) {} void begin() const override { if (nextStreamIt == level.streams.end()) { nextStreamIt = level.streams.begin(); @@ -100,25 +104,31 @@ struct PriorityQueue { bool end() const override { return nextStreamIt == startStreamIt; } - void next() override { + // force will ignore the max nexts and always moves to the next stream. + void next(bool force) override { CHECK(!level.empty()); + if (!force && ++nextsSoFar < maxNextsPerStream) { + return; + } nextStreamIt++; if (nextStreamIt == level.streams.end()) { nextStreamIt = level.streams.begin(); } + nextsSoFar = 0; } }; class SequentialIterator : public Iterator { public: - explicit SequentialIterator(const Level& inLevel) : Iterator(inLevel) {} + explicit SequentialIterator(const Level& inLevel, uint64_t maxNexts) + : Iterator(inLevel, maxNexts) {} void begin() const override { nextStreamIt = level.streams.begin(); } bool end() const override { return nextStreamIt == level.streams.end(); } - void next() override { + void next(bool) override { CHECK(!level.empty()); nextStreamIt++; } @@ -127,7 +137,6 @@ struct PriorityQueue { OrderedStreamSet streams; bool incremental{false}; std::unique_ptr iterator; - FOLLY_NODISCARD bool empty() const { return streams.empty(); } @@ -158,16 +167,26 @@ struct PriorityQueue { }; std::vector levels; using LevelItr = decltype(levels)::const_iterator; + // This controls how many times next() needs to be called before moving + // onto the next stream. + uint64_t maxNextsPerStream{1}; + + void setMaxNextsPerStream(uint64_t maxNexts) { + maxNextsPerStream = maxNexts; + for (auto& l : levels) { + l.iterator->maxNextsPerStream = maxNexts; + } + } PriorityQueue() : levels(kDefaultPriorityLevelsSize) { for (size_t index = 0; index < levels.size(); index++) { if (index % 2 == 1) { levels[index].incremental = true; - levels[index].iterator = - std::make_unique(levels[index]); + levels[index].iterator = std::make_unique( + levels[index], maxNextsPerStream); } else { - levels[index].iterator = - std::make_unique(levels[index]); + levels[index].iterator = std::make_unique( + levels[index], maxNextsPerStream); } } } @@ -286,6 +305,7 @@ struct PriorityQueue { } if (streamIt == level.iterator->nextStreamIt) { level.iterator->nextStreamIt = level.streams.erase(streamIt); + level.iterator->nextsSoFar = 0; } else { level.streams.erase(streamIt); } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index e2642bd30..66487dbc6 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -60,6 +60,8 @@ class QuicStreamManager { nextBidirectionalStreamGroupId_ = nextBidirectionalStreamId_; nextUnidirectionalStreamGroupId_ = nextUnidirectionalStreamId_; refreshTransportSettings(transportSettings); + writeQueue_.setMaxNextsPerStream( + transportSettings.priorityQueueWritesPerStream); } /** diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index f8abd63be..4d3c72aff 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -271,6 +271,10 @@ struct TransportSettings { // The default priority to instantiate streams with. Priority defaultPriority{kDefaultPriority}; + // How many times we will a schedule a stream to packets before moving onto + // the next one in the queue. Only relevant for incremental priority. + uint64_t priorityQueueWritesPerStream{1}; + // Local configuration for ACK receive timestamps. // // Determines the ACK receive timestamp configuration sent to peer,