From 6ecdb35adedc32dc671607b20ff98711a8c265ea Mon Sep 17 00:00:00 2001 From: Matt Joras Date: Thu, 1 Jun 2023 14:11:31 -0700 Subject: [PATCH] Minimum packets per stream before next() moves forward Summary: The idea here is to allow a way to do incremental stream priorities while switching between streams as aggressively. Achieving this is somewhat tricky. The easiest place to track this is to make it so the iterator in QuicPriorityQueue no-op next() until a counter reaches an increment. This is especially impacftul for DSR, where round robining per packet is almost pathologically bad both for CPU impact but also spurious losses and low bandwidth estimates. Thoughts? (Note: this ignores all push blocking failures!) Reviewed By: kvtsoy Differential Revision: D46268308 fbshipit-source-id: cd5b924141365f61f8a3363bc9cb38a62e5c94cf --- quic/QuicConstants.h | 4 +- quic/api/QuicPacketScheduler.cpp | 7 +- quic/api/test/QuicPacketSchedulerTest.cpp | 82 +++++++++++++++++++- quic/codec/QuicWriteCodec.cpp | 1 - quic/server/QuicServerTransport.cpp | 10 +++ quic/server/test/QuicServerTransportTest.cpp | 4 + quic/state/QuicPriorityQueue.h | 46 +++++++---- quic/state/QuicStreamManager.h | 2 + quic/state/TransportSettings.h | 4 + 9 files changed, 143 insertions(+), 17 deletions(-) diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index c9f082848..e7a7d6705 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -171,7 +171,9 @@ BETTER_ENUM( // Controls default stream priority DEFAULT_STREAM_PRIORITY = 0x10003, // Controls write loop time fraction in terms of srtt - WRITE_LOOP_TIME_FRACTION = 0x10004) + WRITE_LOOP_TIME_FRACTION = 0x10004, + // Controls number of times a stream gets a write in incremental mode + WRITES_PER_STREAM = 0x10005) enum class FrameType : uint64_t { PADDING = 0x00, diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index ba5e05f34..5617b3469 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -468,7 +468,12 @@ void StreamFrameScheduler::writeStreamsHelper( if (!writeSingleStream(builder, *stream, connWritableBytes)) { break; } - level.iterator->next(); + auto remainingSpaceAfter = builder.remainingSpaceInPkt(); + // If we wrote a stream frame and there's still space in the packet, + // that implies we ran out of data or flow control on the stream and + // we should bypass the nextsPerStream in the priority queue. + bool forceNext = remainingSpaceAfter > 0; + level.iterator->next(forceNext); if (streamPerPacket) { return; } diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 65bdcbddd..e4cd8a823 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1288,6 +1288,86 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) { EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); } +TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinNextsPer) { + QuicClientConnectionState conn( + FizzClientQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.streamManager->writeQueue().setMaxNextsPerStream(2); + conn.transportSettings.defaultPriority = Priority(0, true); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + auto connId = getTestConnectionId(); + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + connId, + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder.encodePacketHeader(); + auto stream1 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream2 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream3 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096); + auto curBuf = largeBuf.get(); + do { + curBuf->append(curBuf->capacity()); + curBuf = curBuf->next(); + } while (curBuf != largeBuf.get()); + writeDataToQuicStream( + *conn.streamManager->findStream(stream1), std::move(largeBuf), false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream2), + folly::IOBuf::copyBuffer("some data"), + false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream3), + folly::IOBuf::copyBuffer("some data"), + false); + // Should write frames for stream1, stream1, stream2, stream3, followed by + // stream1 again. + NiceMock builder2; + std::vector remainingRet = {1500, 0, 1400, 0, 1300, 1100, 1000, 0}; + auto remainingItr = remainingRet.begin(); + + EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) { + builder2.frames_.push_back(f); + remainingItr++; + })); + EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Invoke([&]() { + return *remainingItr; + })); + scheduler.writeStreams(builder2); + remainingItr++; + ASSERT_EQ(conn.streamManager->writeQueue().getNextScheduledStream(), stream1); + ASSERT_EQ(builder2.frames_.size(), 1); + ASSERT_EQ(*remainingItr, 1400); + scheduler.writeStreams(builder2); + ASSERT_EQ(builder2.frames_.size(), 2); + ASSERT_EQ(conn.streamManager->writeQueue().getNextScheduledStream(), stream2); + ASSERT_EQ(*remainingItr, 0); + remainingItr++; + scheduler.writeStreams(builder2); + scheduler.writeStreams(builder2); + auto& frames = builder2.frames_; + ASSERT_EQ(frames.size(), 5); + ASSERT_TRUE(frames[0].asWriteStreamFrame()); + EXPECT_EQ(frames[0].asWriteStreamFrame()->streamId, stream1); + ASSERT_TRUE(frames[1].asWriteStreamFrame()); + EXPECT_EQ(frames[1].asWriteStreamFrame()->streamId, stream1); + ASSERT_TRUE(frames[2].asWriteStreamFrame()); + EXPECT_EQ(frames[2].asWriteStreamFrame()->streamId, stream2); + ASSERT_TRUE(frames[3].asWriteStreamFrame()); + EXPECT_EQ(frames[3].asWriteStreamFrame()->streamId, stream3); + ASSERT_TRUE(frames[4].asWriteStreamFrame()); + EXPECT_EQ(frames[4].asWriteStreamFrame()->streamId, stream1); +} + TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { QuicClientConnectionState conn( FizzClientQuicHandshakeContext::Builder().build()); @@ -2075,7 +2155,7 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) { auto packet1 = std::move(builder1).buildPacket().packet; updateConnection( conn, folly::none, packet1, Clock::now(), 1200, 0, false /* isDSR */); - EXPECT_EQ(2, packet1.frames.size()); + ASSERT_EQ(2, packet1.frames.size()); auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); EXPECT_EQ(streamId1, writeStreamFrame1.streamId); EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); diff --git a/quic/codec/QuicWriteCodec.cpp b/quic/codec/QuicWriteCodec.cpp index c6e7d5fb0..9941e651d 100644 --- a/quic/codec/QuicWriteCodec.cpp +++ b/quic/codec/QuicWriteCodec.cpp @@ -161,7 +161,6 @@ folly::Optional writeStreamFrameHeader( streamType.hasFin(), false /* fromBufMetaIn */, streamGroupId)); - DCHECK(dataLen <= builder.remainingSpaceInPkt()); return folly::make_optional(dataLen); } diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 9c6d061bb..1dd6ec541 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -1078,6 +1078,16 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() { serverConn->transportSettings.writeLimitRttFraction = val; VLOG(3) << "WRITE_LOOP_TIME_FRACTION KnobParam received: " << val; }); + registerTransportKnobParamHandler( + static_cast(TransportKnobParamId::WRITES_PER_STREAM), + [](QuicServerTransport* serverTransport, TransportKnobParam::Val value) { + CHECK(serverTransport); + auto val = std::get(value); + auto serverConn = serverTransport->serverConn_; + serverConn->transportSettings.priorityQueueWritesPerStream = val; + serverConn->streamManager->writeQueue().setMaxNextsPerStream(val); + VLOG(3) << "WRITES_PER_STREAM KnobParam received: " << val; + }); } QuicConnectionStats QuicServerTransport::getConnectionsStats() const { diff --git a/quic/server/test/QuicServerTransportTest.cpp b/quic/server/test/QuicServerTransportTest.cpp index a8644f556..023a190d1 100644 --- a/quic/server/test/QuicServerTransportTest.cpp +++ b/quic/server/test/QuicServerTransportTest.cpp @@ -4841,6 +4841,10 @@ TEST_F(QuicServerTransportTest, TestAckFrequencyPolicyKnobHandler) { {{static_cast(TransportKnobParamId::WRITE_LOOP_TIME_FRACTION), uint64_t(2)}}); EXPECT_EQ(server->getTransportSettings().writeLimitRttFraction, 2); + server->handleKnobParams( + {{static_cast(TransportKnobParamId::WRITES_PER_STREAM), + uint64_t(5)}}); + EXPECT_EQ(server->getTransportSettings().priorityQueueWritesPerStream, 5); } TEST_F(QuicServerTransportTest, TestSetMaxPacingRateLifecycle) { diff --git a/quic/state/QuicPriorityQueue.h b/quic/state/QuicPriorityQueue.h index d910f0707..e288a8d2d 100644 --- a/quic/state/QuicPriorityQueue.h +++ b/quic/state/QuicPriorityQueue.h @@ -69,18 +69,22 @@ struct PriorityQueue { const Level& level; public: - explicit Iterator(const Level& inLevel) - : level(inLevel), nextStreamIt(level.streams.end()) {} + explicit Iterator(const Level& inLevel, uint64_t maxNexts) + : level(inLevel), + maxNextsPerStream(maxNexts), + nextStreamIt(level.streams.end()) {} virtual ~Iterator() = default; virtual void begin() const = 0; virtual bool end() const = 0; virtual StreamId current() const { return nextStreamIt->streamId; } - virtual void next() = 0; + virtual void next(bool force = false) = 0; virtual void override(OrderedStreamSet::const_iterator it) { nextStreamIt = it; } + mutable uint64_t nextsSoFar{0}; + uint64_t maxNextsPerStream{1}; mutable OrderedStreamSet::const_iterator nextStreamIt; }; @@ -89,8 +93,8 @@ struct PriorityQueue { mutable OrderedStreamSet::const_iterator startStreamIt; public: - explicit IncrementalIterator(const Level& inLevel) - : Iterator(inLevel), startStreamIt(level.streams.end()) {} + explicit IncrementalIterator(const Level& inLevel, uint64_t maxNexts) + : Iterator(inLevel, maxNexts), startStreamIt(level.streams.end()) {} void begin() const override { if (nextStreamIt == level.streams.end()) { nextStreamIt = level.streams.begin(); @@ -100,25 +104,31 @@ struct PriorityQueue { bool end() const override { return nextStreamIt == startStreamIt; } - void next() override { + // force will ignore the max nexts and always moves to the next stream. + void next(bool force) override { CHECK(!level.empty()); + if (!force && ++nextsSoFar < maxNextsPerStream) { + return; + } nextStreamIt++; if (nextStreamIt == level.streams.end()) { nextStreamIt = level.streams.begin(); } + nextsSoFar = 0; } }; class SequentialIterator : public Iterator { public: - explicit SequentialIterator(const Level& inLevel) : Iterator(inLevel) {} + explicit SequentialIterator(const Level& inLevel, uint64_t maxNexts) + : Iterator(inLevel, maxNexts) {} void begin() const override { nextStreamIt = level.streams.begin(); } bool end() const override { return nextStreamIt == level.streams.end(); } - void next() override { + void next(bool) override { CHECK(!level.empty()); nextStreamIt++; } @@ -127,7 +137,6 @@ struct PriorityQueue { OrderedStreamSet streams; bool incremental{false}; std::unique_ptr iterator; - FOLLY_NODISCARD bool empty() const { return streams.empty(); } @@ -158,16 +167,26 @@ struct PriorityQueue { }; std::vector levels; using LevelItr = decltype(levels)::const_iterator; + // This controls how many times next() needs to be called before moving + // onto the next stream. + uint64_t maxNextsPerStream{1}; + + void setMaxNextsPerStream(uint64_t maxNexts) { + maxNextsPerStream = maxNexts; + for (auto& l : levels) { + l.iterator->maxNextsPerStream = maxNexts; + } + } PriorityQueue() : levels(kDefaultPriorityLevelsSize) { for (size_t index = 0; index < levels.size(); index++) { if (index % 2 == 1) { levels[index].incremental = true; - levels[index].iterator = - std::make_unique(levels[index]); + levels[index].iterator = std::make_unique( + levels[index], maxNextsPerStream); } else { - levels[index].iterator = - std::make_unique(levels[index]); + levels[index].iterator = std::make_unique( + levels[index], maxNextsPerStream); } } } @@ -286,6 +305,7 @@ struct PriorityQueue { } if (streamIt == level.iterator->nextStreamIt) { level.iterator->nextStreamIt = level.streams.erase(streamIt); + level.iterator->nextsSoFar = 0; } else { level.streams.erase(streamIt); } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index e2642bd30..66487dbc6 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -60,6 +60,8 @@ class QuicStreamManager { nextBidirectionalStreamGroupId_ = nextBidirectionalStreamId_; nextUnidirectionalStreamGroupId_ = nextUnidirectionalStreamId_; refreshTransportSettings(transportSettings); + writeQueue_.setMaxNextsPerStream( + transportSettings.priorityQueueWritesPerStream); } /** diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index f8abd63be..4d3c72aff 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -271,6 +271,10 @@ struct TransportSettings { // The default priority to instantiate streams with. Priority defaultPriority{kDefaultPriority}; + // How many times we will a schedule a stream to packets before moving onto + // the next one in the queue. Only relevant for incremental priority. + uint64_t priorityQueueWritesPerStream{1}; + // Local configuration for ACK receive timestamps. // // Determines the ACK receive timestamp configuration sent to peer,