From 35a2d34843a4e6dc63ad14419493c32605fbfade Mon Sep 17 00:00:00 2001 From: Matt Joras Date: Thu, 1 Jun 2023 14:11:31 -0700 Subject: [PATCH] Use a single queue for scheduling DSR and non-DSR streams. Summary: The write loop functions for DSR or non-DSR are segmented today. As such, so are the schedulers. Mirroring this, we also currently store the DSR and non-DSR streams in separate write queues. This makes it impossible to effectively balance between the two without potential priority inversions or starvation. Combining them into a single queue eliminates this possibility, but is not entirely straightforward. The main difficulty comes from the schedulers. The `StreamFrameScheduler` for non-DSR data essentially loops over the control stream queue and the normal write queue looking for the next stream to write to a given packet. When the queues are segmented things are nice and easy. When they are combined, we have to deal with the potential that the non-DSR scheduler will hit a stream with only DSR data. Simply bailing isn't quite correct, since it will just cause an empty write loop. To fix that we need check, after we are finished writing a packet, if the next scheduled stream only has DSR data. If it does, we need to ensure `hasPendingData()` returns false. The same needs to be done in reverse for the DSR stream scheduler. The last major compication is that we need another loop which wraps the two individual write loop functions, and calls both functions until the packet limit is exhausted or there's no more data to write. This is to handle the case where there are, for example, two active streams with the same incremental priority, and one is DSR and the other is not. In this case each write loop we want to write `packetLimit` packets, flip flopping between DSR and non DSR packets. This kind of round robining is pathologically bad for DSR, and a future diff will experiment with changing the round robin behavior such that we write a minimum number of packets per stream before moving on to the next stream. This change also contains some other refactors, such as eliminating `updateLossStreams` from the stream manager. (Note: this ignores all push blocking failures!) Reviewed By: kvtsoy Differential Revision: D46249067 fbshipit-source-id: 56a37c02fef51908c1336266ed40ac6d99bd14d4 --- quic/api/QuicPacketScheduler.cpp | 38 ++++-- quic/api/QuicPacketScheduler.h | 1 + quic/api/QuicTransportFunctions.cpp | 1 - quic/api/test/QuicPacketSchedulerTest.cpp | 118 +++++++++++++++--- quic/api/test/QuicTransportTest.cpp | 6 +- quic/common/test/TestUtils.cpp | 4 +- quic/dsr/backend/test/DSRPacketizerTest.cpp | 5 + quic/dsr/frontend/Scheduler.cpp | 78 ++++++++---- quic/dsr/frontend/Scheduler.h | 9 ++ quic/dsr/frontend/test/WriteFunctionsTest.cpp | 74 ++++++++++- quic/loss/QuicLossFunctions.cpp | 1 - quic/loss/test/QuicLossFunctionsTest.cpp | 15 ++- quic/server/QuicServerTransport.cpp | 34 +++-- quic/state/AckHandlers.cpp | 1 - quic/state/QuicPriorityQueue.h | 13 ++ quic/state/QuicStreamManager.cpp | 59 ++++++--- quic/state/QuicStreamManager.h | 80 ++++++------ quic/state/StreamData.h | 9 ++ quic/state/stream/StreamStateFunctions.cpp | 2 - quic/state/test/QuicStreamFunctionsTest.cpp | 12 +- 20 files changed, 401 insertions(+), 159 deletions(-) diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index d152a5d5b..ba5e05f34 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -458,7 +458,11 @@ void StreamFrameScheduler::writeStreamsHelper( level.iterator->begin(); do { auto streamId = level.iterator->current(); - auto stream = conn_.streamManager->findStream(streamId); + auto stream = CHECK_NOTNULL(conn_.streamManager->findStream(streamId)); + if (!stream->hasSchedulableData() && stream->hasSchedulableDsr()) { + // We hit a DSR stream + return; + } CHECK(stream) << "streamId=" << streamId << "inc=" << uint64_t(level.incremental); if (!writeSingleStream(builder, *stream, connWritableBytes)) { @@ -476,30 +480,40 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { DCHECK(conn_.streamManager->hasWritable()); uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); // Write the control streams first as a naive binary priority mechanism. - const auto& writableControlStreams = - conn_.streamManager->writableControlStreams(); - if (!writableControlStreams.empty()) { + const auto& controlWriteQueue = conn_.streamManager->controlWriteQueue(); + if (!controlWriteQueue.empty()) { conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper( builder, - writableControlStreams, + controlWriteQueue, conn_.schedulingState.nextScheduledControlStream, connWritableBytes, conn_.transportSettings.streamFramePerPacket); } - auto& writableStreams = conn_.streamManager->writableStreams(); - if (!writableStreams.empty()) { + auto& writeQueue = conn_.streamManager->writeQueue(); + if (!writeQueue.empty()) { writeStreamsHelper( builder, - writableStreams, + writeQueue, connWritableBytes, conn_.transportSettings.streamFramePerPacket); + // If the next non-control stream is DSR, record that fact in the scheduler + // so that we don't try to write a non DSR stream again. Note that this + // means that in the presence of many large control streams and DSR + // streams, we won't completely prioritize control streams but they + // will not be starved. + auto streamId = writeQueue.getNextScheduledStream(); + auto stream = conn_.streamManager->findStream(streamId); + if (stream && !stream->hasSchedulableData()) { + nextStreamDsr_ = true; + } } -} // namespace quic +} bool StreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasNonDSRLoss() || - (conn_.streamManager->hasNonDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0); + return !nextStreamDsr_ && + (conn_.streamManager->hasNonDSRLoss() || + (conn_.streamManager->hasNonDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0)); } bool StreamFrameScheduler::writeStreamFrame( diff --git a/quic/api/QuicPacketScheduler.h b/quic/api/QuicPacketScheduler.h index e956958e1..c82824169 100644 --- a/quic/api/QuicPacketScheduler.h +++ b/quic/api/QuicPacketScheduler.h @@ -118,6 +118,7 @@ class StreamFrameScheduler { uint64_t& connWritableBytes); QuicConnectionStateBase& conn_; + bool nextStreamDsr_{false}; }; class AckScheduler { diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 0af0fa5cd..8b7c71d65 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -675,7 +675,6 @@ void updateConnection( stream->newStreamBytesSent += writeStreamFrame.len; } conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); streamBytesSent += writeStreamFrame.len; detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten); break; diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 5324f3f26..65bdcbddd 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1218,8 +1218,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) { false); scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), 0); } @@ -1266,8 +1265,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) { // Force the wraparound initially. scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), 4); // Should write frames for stream2, stream3, followed by stream1 again. @@ -1333,8 +1331,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { // The default is to wraparound initially. scheduler.writeStreams(builder1); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), stream2); // Should write frames for stream2, stream3, followed by stream1 again. @@ -1361,6 +1358,97 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); } +TEST_F( + QuicPacketSchedulerTest, + StreamFrameSchedulerRoundRobinStreamPerPacketHitsDsr) { + QuicClientConnectionState conn( + FizzClientQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + conn.transportSettings.streamFramePerPacket = true; + auto connId = getTestConnectionId(); + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + connId, + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder1( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + auto stream1 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream2 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream3 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream4 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096); + auto curBuf = largeBuf.get(); + do { + curBuf->append(curBuf->capacity()); + curBuf = curBuf->next(); + } while (curBuf != largeBuf.get()); + writeDataToQuicStream( + *conn.streamManager->findStream(stream1), std::move(largeBuf), false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream2), + folly::IOBuf::copyBuffer("some data"), + false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream3), + folly::IOBuf::copyBuffer("some data"), + false); + auto sender = std::make_unique(); + ON_CALL(*sender, addSendInstruction(testing::_)) + .WillByDefault(testing::Return(true)); + ON_CALL(*sender, flush()).WillByDefault(testing::Return(true)); + auto dsrStream = conn.streamManager->findStream(stream4); + dsrStream->dsrSender = std::move(sender); + BufferMeta bufMeta(20); + writeDataToQuicStream( + *conn.streamManager->findStream(stream4), + folly::IOBuf::copyBuffer("some data"), + false); + writeBufMetaToQuicStream( + *conn.streamManager->findStream(stream4), bufMeta, true /* eof */); + // Pretend we sent the non DSR data + dsrStream->ackedIntervals.insert(0, dsrStream->writeBuffer.chainLength() - 1); + dsrStream->currentWriteOffset = dsrStream->writeBuffer.chainLength(); + dsrStream->writeBuffer.move(); + conn.streamManager->updateWritableStreams(*dsrStream); + + // The default is to wraparound initially. + scheduler.writeStreams(builder1); + EXPECT_EQ( + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), + stream2); + + // Should write frames for stream2, stream3, followed by an empty write. + NiceMock builder2; + EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Return(4096)); + EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) { + builder2.frames_.push_back(f); + })); + auto& frames = builder2.frames_; + ASSERT_TRUE(scheduler.hasPendingData()); + scheduler.writeStreams(builder2); + ASSERT_EQ(frames.size(), 1); + ASSERT_TRUE(scheduler.hasPendingData()); + scheduler.writeStreams(builder2); + ASSERT_EQ(frames.size(), 2); + EXPECT_FALSE(scheduler.hasPendingData()); + scheduler.writeStreams(builder2); + WriteStreamFrame f1(stream2, 0, 9, false); + WriteStreamFrame f2(stream3, 0, 9, false); + ASSERT_TRUE(frames[0].asWriteStreamFrame()); + EXPECT_EQ(*frames[0].asWriteStreamFrame(), f1); + ASSERT_TRUE(frames[1].asWriteStreamFrame()); + EXPECT_EQ(*frames[1].asWriteStreamFrame(), f2); +} + TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) { QuicClientConnectionState conn( FizzClientQuicHandshakeContext::Builder().build()); @@ -1406,7 +1494,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) { // The default is to wraparound initially. scheduler.writeStreams(builder1); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( + conn.streamManager->writeQueue().getNextScheduledStream( Priority(0, false)), stream1); @@ -1473,7 +1561,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) { // The default is to wraparound initially. scheduler.writeStreams(builder1); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( + conn.streamManager->writeQueue().getNextScheduledStream( Priority(0, false)), stream1); @@ -1550,8 +1638,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) { // The default is to wraparound initially. scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), stream3); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); @@ -1578,8 +1665,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) { EXPECT_EQ(*frames[3].asWriteStreamFrame(), f4); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), stream3); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); } @@ -1605,8 +1691,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) { writeDataToQuicStream(*stream1, folly::IOBuf::copyBuffer("some data"), false); scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), 0); } @@ -1644,7 +1729,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) { // Manually remove a stream and set the next scheduled to that stream. builder.frames_.clear(); - conn.streamManager->writableStreams().setNextScheduledStream(stream2); + conn.streamManager->writeQueue().setNextScheduledStream(stream2); conn.streamManager->removeWritable(*conn.streamManager->findStream(stream2)); scheduler.writeStreams(builder); ASSERT_EQ(builder.frames_.size(), 1); @@ -1817,7 +1902,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->retransmissionBuffer.clear(); conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); EXPECT_TRUE(scheduler.hasPendingData()); // Write again @@ -1862,7 +1946,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) { dsrStream->insertIntoLossBufMeta(bufMeta); conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*dsrStream); - conn.streamManager->updateLossStreams(*dsrStream); StreamFrameScheduler scheduler(conn); EXPECT_TRUE(scheduler.hasPendingData()); @@ -1931,7 +2014,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->retransmissionBuffer.clear(); conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); EXPECT_TRUE(scheduler.hasPendingData()); // Write again diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index d9bfe218f..a9a3b013c 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -165,7 +165,6 @@ void dropPackets(QuicServerConnectionState& conn) { std::move(*itr->second)); stream->retransmissionBuffer.erase(itr); conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); } } conn.outstandings.reset(); @@ -295,7 +294,6 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) { } while (curBuf != largeBuf.get()); lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false); conn.streamManager->updateWritableStreams(*lossStreamState); - conn.streamManager->updateLossStreams(*lossStreamState); transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0); @@ -4262,7 +4260,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) { conn.outstandings.reset(); // Start from stream2 instead of stream1 - conn.streamManager->writableStreams().setNextScheduledStream(s2); + conn.streamManager->writeQueue().setNextScheduledStream(s2); writableBytes = kDefaultUDPSendPacketLen - 100; EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); @@ -4287,7 +4285,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) { conn.outstandings.reset(); // Test wrap around - conn.streamManager->writableStreams().setNextScheduledStream(s2); + conn.streamManager->writeQueue().setNextScheduledStream(s2); writableBytes = kDefaultUDPSendPacketLen; EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); writeQuicDataToSocket( diff --git a/quic/common/test/TestUtils.cpp b/quic/common/test/TestUtils.cpp index 0ae47db1c..49bae9a0b 100644 --- a/quic/common/test/TestUtils.cpp +++ b/quic/common/test/TestUtils.cpp @@ -731,8 +731,8 @@ void overridePacketWithToken( } bool writableContains(QuicStreamManager& streamManager, StreamId streamId) { - return streamManager.writableStreams().count(streamId) > 0 || - streamManager.writableControlStreams().count(streamId) > 0; + return streamManager.writeQueue().count(streamId) > 0 || + streamManager.controlWriteQueue().count(streamId) > 0; } std::unique_ptr diff --git a/quic/dsr/backend/test/DSRPacketizerTest.cpp b/quic/dsr/backend/test/DSRPacketizerTest.cpp index c46e70fc8..809a94cb6 100644 --- a/quic/dsr/backend/test/DSRPacketizerTest.cpp +++ b/quic/dsr/backend/test/DSRPacketizerTest.cpp @@ -140,6 +140,11 @@ TEST_F(DSRMultiWriteTest, TwoRequestsWithLoss) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(1000); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto bufMetaStartingOffset = stream->writeBufMeta.offset; // Move part of the BufMetas to lossBufMetas auto split = stream->writeBufMeta.split(500); diff --git a/quic/dsr/frontend/Scheduler.cpp b/quic/dsr/frontend/Scheduler.cpp index 713ee74ca..446b93319 100644 --- a/quic/dsr/frontend/Scheduler.cpp +++ b/quic/dsr/frontend/Scheduler.cpp @@ -18,9 +18,34 @@ DSRStreamFrameScheduler::DSRStreamFrameScheduler( : conn_(conn) {} bool DSRStreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasDSRLoss() || - (conn_.streamManager->hasDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0); + return !nextStreamNonDsr_ && + (conn_.streamManager->hasDSRLoss() || + (conn_.streamManager->hasDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0)); +} + +DSRStreamFrameScheduler::SchedulingResult +DSRStreamFrameScheduler::enrichAndAddSendInstruction( + uint32_t encodedSize, + DSRStreamFrameScheduler::SchedulingResult result, + DSRPacketBuilderBase& packetBuilder, + SendInstruction::Builder& instructionBuilder, + const PriorityQueue& writeQueue, + const PriorityQueue::LevelItr& levelIter, + QuicStreamState& stream) { + enrichInstruction(instructionBuilder, stream); + packetBuilder.addSendInstruction( + instructionBuilder.build(), encodedSize, stream.streamPacketIdx++); + result.writeSuccess = true; + result.sender = stream.dsrSender.get(); + levelIter->iterator->next(); + auto nextStreamId = writeQueue.getNextScheduledStream(); + auto nextStream = + CHECK_NOTNULL(conn_.streamManager->findStream(nextStreamId)); + if (nextStream->hasSchedulableData()) { + nextStreamNonDsr_ = true; + } + return result; } /** @@ -32,22 +57,25 @@ bool DSRStreamFrameScheduler::hasPendingData() const { DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( DSRPacketBuilderBase& builder) { SchedulingResult result; - auto& writableDSRStreams = conn_.streamManager->writableDSRStreams(); + auto& writeQueue = conn_.streamManager->writeQueue(); const auto& levelIter = std::find_if( - writableDSRStreams.levels.cbegin(), - writableDSRStreams.levels.cend(), + writeQueue.levels.cbegin(), + writeQueue.levels.cend(), [&](const auto& level) { return !level.empty(); }); - if (levelIter == writableDSRStreams.levels.cend()) { + if (levelIter == writeQueue.levels.cend()) { return result; } levelIter->iterator->begin(); auto streamId = levelIter->iterator->current(); auto stream = conn_.streamManager->findStream(streamId); CHECK(stream); - CHECK(stream->dsrSender); + if (!stream->dsrSender || !stream->hasSchedulableDsr()) { + nextStreamNonDsr_ = true; + return result; + } bool hasFreshBufMeta = stream->writeBufMeta.length > 0; bool hasLossBufMeta = !stream->lossBufMetas.empty(); - CHECK(hasFreshBufMeta || hasLossBufMeta); + CHECK(stream->hasSchedulableDsr()); if (hasLossBufMeta) { SendInstruction::Builder instructionBuilder(conn_, streamId); auto encodedSize = writeDSRStreamFrame( @@ -64,15 +92,14 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( if (builder.remainingSpace() < encodedSize) { return result; } - enrichInstruction(instructionBuilder, *stream); - builder.addSendInstruction( - instructionBuilder.build(), - /*streamEncodedSize=*/encodedSize, - /*streamPacketIdx=*/stream->streamPacketIdx++); - result.writeSuccess = true; - result.sender = stream->dsrSender.get(); - levelIter->iterator->next(); - return result; + return enrichAndAddSendInstruction( + encodedSize, + std::move(result), + builder, + instructionBuilder, + writeQueue, + levelIter, + *stream); } } if (!hasFreshBufMeta || builder.remainingSpace() == 0) { @@ -109,13 +136,14 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( if (builder.remainingSpace() < encodedSize) { return result; } - enrichInstruction(instructionBuilder, *stream); - builder.addSendInstruction( - instructionBuilder.build(), encodedSize, stream->streamPacketIdx++); - result.writeSuccess = true; - result.sender = stream->dsrSender.get(); - levelIter->iterator->next(); - return result; + return enrichAndAddSendInstruction( + encodedSize, + std::move(result), + builder, + instructionBuilder, + writeQueue, + levelIter, + *stream); } return result; } diff --git a/quic/dsr/frontend/Scheduler.h b/quic/dsr/frontend/Scheduler.h index f31e2e003..4d875c15f 100644 --- a/quic/dsr/frontend/Scheduler.h +++ b/quic/dsr/frontend/Scheduler.h @@ -37,8 +37,17 @@ class DSRStreamFrameScheduler { void enrichInstruction( SendInstruction::Builder& builder, const QuicStreamState& stream); + SchedulingResult enrichAndAddSendInstruction( + uint32_t, + SchedulingResult, + DSRPacketBuilderBase&, + SendInstruction::Builder&, + const PriorityQueue&, + const PriorityQueue::LevelItr&, + QuicStreamState&); private: QuicServerConnectionState& conn_; + bool nextStreamNonDsr_{false}; }; } // namespace quic diff --git a/quic/dsr/frontend/test/WriteFunctionsTest.cpp b/quic/dsr/frontend/test/WriteFunctionsTest.cpp index 588d27c27..46e0f3927 100644 --- a/quic/dsr/frontend/test/WriteFunctionsTest.cpp +++ b/quic/dsr/frontend/test/WriteFunctionsTest.cpp @@ -69,6 +69,11 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimit) { auto streamId = prepareOneStream(3000); auto cid = getTestConnectionId(); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto currentBufMetaOffset = stream->writeBufMeta.offset; size_t packetLimit = 2; conn_.lossState.srtt = 100ms; @@ -96,6 +101,11 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimitNoLimit) { auto streamId = prepareOneStream(3000); auto cid = getTestConnectionId(); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto currentBufMetaOffset = stream->writeBufMeta.offset; size_t packetLimit = 2; conn_.lossState.srtt = 100ms; @@ -123,6 +133,11 @@ TEST_F(WriteFunctionsTest, WriteTwoInstructions) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(2000); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto cid = getTestConnectionId(); size_t packetLimit = 20; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); @@ -136,6 +151,11 @@ TEST_F(WriteFunctionsTest, PacketLimit) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(2000 * 100); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto mockCongestionController = std::make_unique>(); auto rawCongestionController = mockCongestionController.get(); @@ -157,20 +177,51 @@ TEST_F(WriteFunctionsTest, WriteTwoStreams) { auto streamId2 = prepareOneStream(1000); auto stream1 = conn_.streamManager->findStream(streamId1); auto stream2 = conn_.streamManager->findStream(streamId2); + // Pretend we sent the non DSR data on second stream + stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1); + stream2->currentWriteOffset = stream2->writeBuffer.chainLength(); + stream2->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream2); auto cid = getTestConnectionId(); size_t packetLimit = 20; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(1, stream1->retransmissionBufMetas.size()); EXPECT_EQ(1, stream2->retransmissionBufMetas.size()); - // TODO: This needs to be fixed later: The stream and the sender needs to be - // 1:1 in the future. Then there will be two senders for this test case and - // each of them will send out one instruction. EXPECT_EQ(1, countInstructions(streamId1)); EXPECT_EQ(1, countInstructions(streamId2)); EXPECT_EQ(2, conn_.outstandings.packets.size()); EXPECT_TRUE(verifyAllOutstandingsAreDSR()); } +TEST_F(WriteFunctionsTest, WriteThreeStreamsNonDsrAndDsr) { + prepareFlowControlAndStreamLimit(); + auto streamId1 = prepareOneStream(1000); + auto streamId2 = prepareOneStream(1000); + auto streamId3 = prepareOneStream(1000); + auto stream1 = conn_.streamManager->findStream(streamId1); + auto stream2 = conn_.streamManager->findStream(streamId2); + auto stream3 = conn_.streamManager->findStream(streamId3); + auto cid = getTestConnectionId(); + size_t packetLimit = 20; + // First loop only write a single packet because it will find there's non-DSR + // data to write on the next stream. + EXPECT_EQ(1, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); + // Pretend we sent the non DSR data for last stream + stream3->ackedIntervals.insert(0, stream3->writeBuffer.chainLength() - 1); + stream3->currentWriteOffset = stream3->writeBuffer.chainLength(); + stream3->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream3); + EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); + EXPECT_EQ(1, stream1->retransmissionBufMetas.size()); + EXPECT_EQ(1, stream2->retransmissionBufMetas.size()); + EXPECT_EQ(1, stream3->retransmissionBufMetas.size()); + EXPECT_EQ(1, countInstructions(streamId1)); + EXPECT_EQ(1, countInstructions(streamId2)); + EXPECT_EQ(1, countInstructions(streamId3)); + EXPECT_EQ(3, conn_.outstandings.packets.size()); + EXPECT_TRUE(verifyAllOutstandingsAreDSR()); +} + TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) { prepareFlowControlAndStreamLimit(); auto streamId1 = prepareOneStream(2000); @@ -179,6 +230,11 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) { auto stream2 = conn_.streamManager->findStream(streamId2); conn_.streamManager->setStreamPriority(streamId1, Priority{3, false}); conn_.streamManager->setStreamPriority(streamId2, Priority{3, false}); + // Pretend we sent the non DSR data on first stream + stream1->ackedIntervals.insert(0, stream1->writeBuffer.chainLength() - 1); + stream1->currentWriteOffset = stream1->writeBuffer.chainLength(); + stream1->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream1); auto cid = getTestConnectionId(); size_t packetLimit = 2; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); @@ -198,6 +254,11 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsIncremental) { auto stream2 = conn_.streamManager->findStream(streamId2); conn_.streamManager->setStreamPriority(streamId1, Priority{3, true}); conn_.streamManager->setStreamPriority(streamId2, Priority{3, true}); + // Pretend we sent the non DSR data on second stream + stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1); + stream2->currentWriteOffset = stream2->writeBuffer.chainLength(); + stream2->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream2); auto cid = getTestConnectionId(); size_t packetLimit = 2; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); @@ -213,6 +274,11 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(1000); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto bufMetaStartingOffset = stream->writeBufMeta.offset; // Move part of the BufMetas to lossBufMetas auto split = stream->writeBufMeta.split(500); @@ -246,7 +312,7 @@ TEST_F( // Move part of the BufMetas to lossBufMetas auto split = stream->writeBufMeta.split(500); stream->lossBufMetas.push_back(split); - conn_.streamManager->updateLossStreams(*stream); + conn_.streamManager->updateWritableStreams(*stream); // Zero out conn flow control. conn_.flowControlState.sumCurWriteOffset = conn_.flowControlState.peerAdvertisedMaxOffset; diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index 9bdc91148..e8c93245c 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -200,7 +200,6 @@ void markPacketLoss( stream->retransmissionBufMetas.erase(retxBufMetaItr); } conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); break; } case QuicWriteFrame::Type::WriteCryptoFrame: { diff --git a/quic/loss/test/QuicLossFunctionsTest.cpp b/quic/loss/test/QuicLossFunctionsTest.cpp index dad7b6ad7..a0831e48e 100644 --- a/quic/loss/test/QuicLossFunctionsTest.cpp +++ b/quic/loss/test/QuicLossFunctionsTest.cpp @@ -2348,7 +2348,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { ASSERT_EQ(200, retxBufMetaIter->second.length); ASSERT_FALSE(retxBufMetaIter->second.eof); conn->streamManager->updateWritableStreams(*stream); - conn->streamManager->updateLossStreams(*stream); EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); @@ -2370,7 +2369,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { ASSERT_EQ(400, retxBufMetaIter->second.length); ASSERT_FALSE(retxBufMetaIter->second.eof); conn->streamManager->updateWritableStreams(*stream); - conn->streamManager->updateLossStreams(*stream); EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); @@ -2392,7 +2390,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { ASSERT_EQ(400, retxBufMetaIter->second.length); ASSERT_TRUE(retxBufMetaIter->second.eof); conn->streamManager->updateWritableStreams(*stream); - conn->streamManager->updateLossStreams(*stream); EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->writableDSRStreams().empty()); @@ -2411,7 +2408,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { EXPECT_EQ(2, stream->retransmissionBufMetas.size()); EXPECT_TRUE(conn->streamManager->hasLoss()); ASSERT_EQ(stream->streamLossCount, 1); - EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); + EXPECT_FALSE(stream->hasWritableBufMeta()); + EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); + EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); // Lose the 3rd dsr packet: RegularQuicWritePacket packet3(PacketHeader(ShortHeader( @@ -2428,7 +2427,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { EXPECT_EQ(1, stream->retransmissionBufMetas.size()); ASSERT_EQ(stream->streamLossCount, 2); EXPECT_TRUE(conn->streamManager->hasLoss()); - EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); + EXPECT_FALSE(stream->hasWritableBufMeta()); + EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); + EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); // Lose the 3nd dsr packet, it should be merged together with the first // element in the lossBufMetas: @@ -2446,7 +2447,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { EXPECT_EQ(0, stream->retransmissionBufMetas.size()); ASSERT_EQ(stream->streamLossCount, 3); EXPECT_TRUE(conn->streamManager->hasLoss()); - EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); + EXPECT_FALSE(stream->hasWritableBufMeta()); + EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); + EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); } TEST_F(QuicLossFunctionsTest, TestReorderingThresholdDSRNormal) { diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 3a916eb4e..9c6d061bb 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -325,13 +325,8 @@ void QuicServerTransport::writeData() { } if (conn_->oneRttWriteCipher) { CHECK(conn_->oneRttWriteHeaderCipher); - // This kind of sucks, but right now we don't have a way to schedule DSR - // and non-DSR streams as part of the same priority queue. This can lead - // to prioritity inversions and starving of one kind of stream over the - // other. To mitigate this, randomly decide whether to write DSR or non - // DSR data first which at least ensures fairness over time. auto writeLoopBeginTime = Clock::now(); - auto nonDsrPath = [&]() { + auto nonDsrPath = [&](auto limit) { return writeQuicDataToSocket( *socket_, *conn_, @@ -340,33 +335,34 @@ void QuicServerTransport::writeData() { *conn_->oneRttWriteCipher, *conn_->oneRttWriteHeaderCipher, version, - packetLimit, + limit, writeLoopBeginTime); }; - auto dsrPath = [&]() { + auto dsrPath = [&](auto limit) { return writePacketizationRequest( *serverConn_, destConnId, - packetLimit, + limit, *conn_->oneRttWriteCipher, writeLoopBeginTime); }; - if (folly::Random::oneIn(2)) { - if (packetLimit && congestionControlWritableBytes(*serverConn_)) { - packetLimit -= dsrPath(); - } - if (packetLimit) { - packetLimit -= nonDsrPath().packetsWritten; - } - } else { - auto written = nonDsrPath(); + // We need a while loop because both paths write streams from the same + // queue, which can result in empty writes. + while (packetLimit) { + auto startingPacketLimit = packetLimit; + // Give the non-DSR path a chance. + auto written = nonDsrPath(packetLimit); // If we didn't write much from the non DSR path, don't penalize DSR // a full packet. if (written.bytesWritten >= conn_->udpSendPacketLen / 2) { packetLimit -= written.packetsWritten; } if (packetLimit && congestionControlWritableBytes(*serverConn_)) { - packetLimit -= dsrPath(); + packetLimit -= dsrPath(packetLimit); + } + if (startingPacketLimit == packetLimit) { + // We haven't written anything with either path, so we're done. + break; } } } diff --git a/quic/state/AckHandlers.cpp b/quic/state/AckHandlers.cpp index 57c867d4b..d57180956 100644 --- a/quic/state/AckHandlers.cpp +++ b/quic/state/AckHandlers.cpp @@ -170,7 +170,6 @@ AckEvent processAckFrame( streamFrame->offset, streamFrame->len, streamFrame->fin); stream->updateAckedIntervals( streamFrame->offset, streamFrame->len, streamFrame->fin); - conn.streamManager->updateLossStreams(*stream); conn.streamManager->updateWritableStreams(*stream); } } diff --git a/quic/state/QuicPriorityQueue.h b/quic/state/QuicPriorityQueue.h index 2159c9979..d910f0707 100644 --- a/quic/state/QuicPriorityQueue.h +++ b/quic/state/QuicPriorityQueue.h @@ -157,6 +157,7 @@ struct PriorityQueue { folly::F14FastMap streamToOrderId; }; std::vector levels; + using LevelItr = decltype(levels)::const_iterator; PriorityQueue() : levels(kDefaultPriorityLevelsSize) { for (size_t index = 0; index < levels.size(); index++) { @@ -258,6 +259,18 @@ struct PriorityQueue { return level.iterator->nextStreamIt->streamId; } + FOLLY_NODISCARD StreamId getNextScheduledStream() const { + const auto& levelIter = + std::find_if(levels.cbegin(), levels.cend(), [&](const auto& level) { + return !level.empty(); + }); + // The expectation is that calling this function on an empty queue is + // a bug. + CHECK(levelIter != levels.cend()); + levelIter->iterator->begin(); + return levelIter->iterator->current(); + } + private: folly::F14FastMap writableStreamsToLevel_; using WSIterator = decltype(writableStreamsToLevel_)::iterator; diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index 9b64da63b..b4ccf8267 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -246,10 +246,7 @@ bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) { } notifyStreamPriorityChanges(); } - // If this stream is already in the writable or loss queues, update the - // priority there. - writableStreams_.updateIfExist(id, stream->priority); - writableDSRStreams_.updateIfExist(id, stream->priority); + writeQueue_.updateIfExist(id, stream->priority); return true; } return false; @@ -560,10 +557,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { DCHECK(it->second.inTerminalStates()); readableStreams_.erase(streamId); peekableStreams_.erase(streamId); - writableStreams_.erase(streamId); - writableDSRStreams_.erase(streamId); - writableControlStreams_.erase(streamId); - removeLoss(streamId); + removeWritable(it->second); blockedStreams_.erase(streamId); deliverableStreams_.erase(streamId); txStreams_.erase(streamId); @@ -644,22 +638,49 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { CHECK(stream.lossBuffer.empty()); CHECK(stream.lossBufMetas.empty()); removeWritable(stream); - removeDSRWritable(stream); + writableStreams_.erase(stream.id); + writableDSRStreams_.erase(stream.id); + lossStreams_.erase(stream.id); + lossDSRStreams_.erase(stream.id); + if (stream.isControl) { + controlWriteQueue_.erase(stream.id); + } else { + writeQueue_.erase(stream.id); + } return; } - if (stream.hasWritableData() || !stream.lossBuffer.empty()) { - addWritable(stream); + if (stream.hasWritableData()) { + writableStreams_.emplace(stream.id); } else { - removeWritable(stream); + writableStreams_.erase(stream.id); } - if (stream.isControl) { - return; - } - if (stream.dsrSender && - (stream.hasWritableBufMeta() || !stream.lossBufMetas.empty())) { - addDSRWritable(stream); + if (stream.hasWritableBufMeta()) { + writableDSRStreams_.emplace(stream.id); } else { - removeDSRWritable(stream); + writableDSRStreams_.erase(stream.id); + } + if (!stream.lossBuffer.empty()) { + lossStreams_.emplace(stream.id); + } else { + lossStreams_.erase(stream.id); + } + if (!stream.lossBufMetas.empty()) { + lossDSRStreams_.emplace(stream.id); + } else { + lossDSRStreams_.erase(stream.id); + } + if (stream.hasSchedulableData() || stream.hasSchedulableDsr()) { + if (stream.isControl) { + controlWriteQueue_.emplace(stream.id); + } else { + writeQueue_.insertOrUpdate(stream.id, stream.priority); + } + } else { + if (stream.isControl) { + controlWriteQueue_.erase(stream.id); + } else { + writeQueue_.erase(stream.id); + } } } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 1f3ea9738..e2642bd30 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -130,9 +130,10 @@ class QuicStreamManager { lossDSRStreams_ = std::move(other.lossDSRStreams_); readableStreams_ = std::move(other.readableStreams_); peekableStreams_ = std::move(other.peekableStreams_); + writeQueue_ = std::move(other.writeQueue_); + controlWriteQueue_ = std::move(other.controlWriteQueue_); writableStreams_ = std::move(other.writableStreams_); writableDSRStreams_ = std::move(other.writableDSRStreams_); - writableControlStreams_ = std::move(other.writableControlStreams_); txStreams_ = std::move(other.txStreams_); deliverableStreams_ = std::move(other.deliverableStreams_); closedStreams_ = std::move(other.closedStreams_); @@ -385,19 +386,6 @@ class QuicStreamManager { lossStreams_.insert(id); } - void updateLossStreams(const QuicStreamState& stream) { - if (!stream.hasLoss()) { - removeLoss(stream.id); - } else { - if (!stream.lossBuffer.empty()) { - lossStreams_.emplace(stream.id); - } - if (!stream.lossBufMetas.empty()) { - lossDSRStreams_.emplace(stream.id); - } - } - } - /** * Update stream priority if the stream indicated by id exists, and the * passed in values are different from current priority. Return true if @@ -423,16 +411,19 @@ class QuicStreamManager { * Returns a mutable reference to the container holding the writable stream * IDs. */ - auto& writableControlStreams() { - return writableControlStreams_; + auto& controlWriteQueue() { + return controlWriteQueue_; + } + + auto& writeQueue() { + return writeQueue_; } /* * Returns if there are any writable streams. */ bool hasWritable() const { - return !writableStreams_.empty() || !writableDSRStreams_.empty() || - !writableControlStreams_.empty(); + return !writeQueue_.empty() || !controlWriteQueue_.empty(); } FOLLY_NODISCARD bool hasDSRWritable() const { @@ -440,7 +431,7 @@ class QuicStreamManager { } bool hasNonDSRWritable() const { - return !writableStreams_.empty() || !writableControlStreams_.empty(); + return !writableStreams_.empty() || !controlWriteQueue_.empty(); } /* @@ -448,17 +439,26 @@ class QuicStreamManager { */ void addWritable(const QuicStreamState& stream) { if (stream.isControl) { - writableControlStreams_.insert(stream.id); + // Control streams get their own queue. + CHECK(stream.hasSchedulableData()); + controlWriteQueue_.insert(stream.id); } else { - CHECK(stream.hasWritableData() || !stream.lossBuffer.empty()); - writableStreams_.insertOrUpdate(stream.id, stream.priority); + CHECK(stream.hasSchedulableData() || stream.hasSchedulableDsr()); + writeQueue_.insertOrUpdate(stream.id, stream.priority); + } + if (stream.hasWritableData()) { + writableStreams_.insert(stream.id); + } + if (stream.hasWritableBufMeta()) { + LOG(ERROR) << "writable DSR: " << stream.id; + writableDSRStreams_.insert(stream.id); + } + if (!stream.lossBuffer.empty()) { + lossStreams_.insert(stream.id); + } + if (!stream.lossBufMetas.empty()) { + lossDSRStreams_.insert(stream.id); } - } - - void addDSRWritable(const QuicStreamState& stream) { - CHECK(!stream.isControl); - CHECK(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty()); - writableDSRStreams_.insertOrUpdate(stream.id, stream.priority); } /* @@ -466,15 +466,14 @@ class QuicStreamManager { */ void removeWritable(const QuicStreamState& stream) { if (stream.isControl) { - writableControlStreams_.erase(stream.id); + controlWriteQueue_.erase(stream.id); } else { - writableStreams_.erase(stream.id); + writeQueue_.erase(stream.id); } - } - - void removeDSRWritable(const QuicStreamState& stream) { - CHECK(!stream.isControl); + writableStreams_.erase(stream.id); writableDSRStreams_.erase(stream.id); + lossStreams_.erase(stream.id); + lossDSRStreams_.erase(stream.id); } /* @@ -483,7 +482,8 @@ class QuicStreamManager { void clearWritable() { writableStreams_.clear(); writableDSRStreams_.clear(); - writableControlStreams_.clear(); + writeQueue_.clear(); + controlWriteQueue_.clear(); } /* @@ -1166,12 +1166,14 @@ class QuicStreamManager { // Set of streams that have pending peeks folly::F14FastSet peekableStreams_; - // Set of !control streams that have writable data - PriorityQueue writableStreams_; - PriorityQueue writableDSRStreams_; + // Set of !control streams that have writable data used for frame scheduling + PriorityQueue writeQueue_; // Set of control streams that have writable data - std::set writableControlStreams_; + std::set controlWriteQueue_; + + folly::F14FastSet writableStreams_; + folly::F14FastSet writableDSRStreams_; // Streams that may be able to call TxCallback folly::F14FastSet txStreams_; diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 022df6dc4..3379f429c 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -411,6 +411,15 @@ struct QuicStreamState : public QuicStreamLike { return false; } + // Whether this stream has non-DSR data in the write buffer or loss buffer. + FOLLY_NODISCARD bool hasSchedulableData() const { + return hasWritableData() || !lossBuffer.empty(); + } + + FOLLY_NODISCARD bool hasSchedulableDsr() const { + return hasWritableBufMeta() || !lossBufMetas.empty(); + } + FOLLY_NODISCARD bool hasWritableBufMeta() const { if (writeBufMeta.offset == 0) { return false; diff --git a/quic/state/stream/StreamStateFunctions.cpp b/quic/state/stream/StreamStateFunctions.cpp index f343f6f3f..7c7bc9bb4 100644 --- a/quic/state/stream/StreamStateFunctions.cpp +++ b/quic/state/stream/StreamStateFunctions.cpp @@ -27,7 +27,6 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) { } stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->updateLossStreams(stream); } void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) { @@ -63,7 +62,6 @@ void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) { } stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->updateLossStreams(stream); QUIC_STATS(stream.conn.statsCallback, onQuicStreamReset, frame.errorCode); } diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index a30ed9a9b..626042d83 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -2236,14 +2236,14 @@ TEST_F(QuicStreamFunctionsTest, LossBufferEmpty) { StreamId id = 4; QuicStreamState stream(id, conn); conn.streamManager->addLoss(id); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_FALSE(conn.streamManager->hasLoss()); } TEST_F(QuicStreamFunctionsTest, LossBufferEmptyNoChange) { StreamId id = 4; QuicStreamState stream(id, conn); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_FALSE(conn.streamManager->hasLoss()); } @@ -2251,7 +2251,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferHasData) { StreamId id = 4; QuicStreamState stream(id, conn); stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2263,7 +2263,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferMetaHasData) { b.setOffset(10); b.setEOF(false); stream.lossBufMetas.emplace_back(b.build()); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2276,7 +2276,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferMetaStillHasData) { b.setOffset(10); b.setEOF(false); stream.lossBufMetas.emplace_back(b.build()); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2285,7 +2285,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferStillHasData) { QuicStreamState stream(id, conn); conn.streamManager->addLoss(id); stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); }