diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index d152a5d5b..ba5e05f34 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -458,7 +458,11 @@ void StreamFrameScheduler::writeStreamsHelper( level.iterator->begin(); do { auto streamId = level.iterator->current(); - auto stream = conn_.streamManager->findStream(streamId); + auto stream = CHECK_NOTNULL(conn_.streamManager->findStream(streamId)); + if (!stream->hasSchedulableData() && stream->hasSchedulableDsr()) { + // We hit a DSR stream + return; + } CHECK(stream) << "streamId=" << streamId << "inc=" << uint64_t(level.incremental); if (!writeSingleStream(builder, *stream, connWritableBytes)) { @@ -476,30 +480,40 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { DCHECK(conn_.streamManager->hasWritable()); uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); // Write the control streams first as a naive binary priority mechanism. - const auto& writableControlStreams = - conn_.streamManager->writableControlStreams(); - if (!writableControlStreams.empty()) { + const auto& controlWriteQueue = conn_.streamManager->controlWriteQueue(); + if (!controlWriteQueue.empty()) { conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper( builder, - writableControlStreams, + controlWriteQueue, conn_.schedulingState.nextScheduledControlStream, connWritableBytes, conn_.transportSettings.streamFramePerPacket); } - auto& writableStreams = conn_.streamManager->writableStreams(); - if (!writableStreams.empty()) { + auto& writeQueue = conn_.streamManager->writeQueue(); + if (!writeQueue.empty()) { writeStreamsHelper( builder, - writableStreams, + writeQueue, connWritableBytes, conn_.transportSettings.streamFramePerPacket); + // If the next non-control stream is DSR, record that fact in the scheduler + // so that we don't try to write a non DSR stream again. Note that this + // means that in the presence of many large control streams and DSR + // streams, we won't completely prioritize control streams but they + // will not be starved. + auto streamId = writeQueue.getNextScheduledStream(); + auto stream = conn_.streamManager->findStream(streamId); + if (stream && !stream->hasSchedulableData()) { + nextStreamDsr_ = true; + } } -} // namespace quic +} bool StreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasNonDSRLoss() || - (conn_.streamManager->hasNonDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0); + return !nextStreamDsr_ && + (conn_.streamManager->hasNonDSRLoss() || + (conn_.streamManager->hasNonDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0)); } bool StreamFrameScheduler::writeStreamFrame( diff --git a/quic/api/QuicPacketScheduler.h b/quic/api/QuicPacketScheduler.h index e956958e1..c82824169 100644 --- a/quic/api/QuicPacketScheduler.h +++ b/quic/api/QuicPacketScheduler.h @@ -118,6 +118,7 @@ class StreamFrameScheduler { uint64_t& connWritableBytes); QuicConnectionStateBase& conn_; + bool nextStreamDsr_{false}; }; class AckScheduler { diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 0af0fa5cd..8b7c71d65 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -675,7 +675,6 @@ void updateConnection( stream->newStreamBytesSent += writeStreamFrame.len; } conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); streamBytesSent += writeStreamFrame.len; detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten); break; diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 5324f3f26..65bdcbddd 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1218,8 +1218,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) { false); scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), 0); } @@ -1266,8 +1265,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) { // Force the wraparound initially. scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), 4); // Should write frames for stream2, stream3, followed by stream1 again. @@ -1333,8 +1331,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { // The default is to wraparound initially. scheduler.writeStreams(builder1); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), stream2); // Should write frames for stream2, stream3, followed by stream1 again. @@ -1361,6 +1358,97 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); } +TEST_F( + QuicPacketSchedulerTest, + StreamFrameSchedulerRoundRobinStreamPerPacketHitsDsr) { + QuicClientConnectionState conn( + FizzClientQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + conn.transportSettings.streamFramePerPacket = true; + auto connId = getTestConnectionId(); + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + connId, + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder1( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + auto stream1 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream2 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream3 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto stream4 = + conn.streamManager->createNextBidirectionalStream().value()->id; + auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096); + auto curBuf = largeBuf.get(); + do { + curBuf->append(curBuf->capacity()); + curBuf = curBuf->next(); + } while (curBuf != largeBuf.get()); + writeDataToQuicStream( + *conn.streamManager->findStream(stream1), std::move(largeBuf), false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream2), + folly::IOBuf::copyBuffer("some data"), + false); + writeDataToQuicStream( + *conn.streamManager->findStream(stream3), + folly::IOBuf::copyBuffer("some data"), + false); + auto sender = std::make_unique(); + ON_CALL(*sender, addSendInstruction(testing::_)) + .WillByDefault(testing::Return(true)); + ON_CALL(*sender, flush()).WillByDefault(testing::Return(true)); + auto dsrStream = conn.streamManager->findStream(stream4); + dsrStream->dsrSender = std::move(sender); + BufferMeta bufMeta(20); + writeDataToQuicStream( + *conn.streamManager->findStream(stream4), + folly::IOBuf::copyBuffer("some data"), + false); + writeBufMetaToQuicStream( + *conn.streamManager->findStream(stream4), bufMeta, true /* eof */); + // Pretend we sent the non DSR data + dsrStream->ackedIntervals.insert(0, dsrStream->writeBuffer.chainLength() - 1); + dsrStream->currentWriteOffset = dsrStream->writeBuffer.chainLength(); + dsrStream->writeBuffer.move(); + conn.streamManager->updateWritableStreams(*dsrStream); + + // The default is to wraparound initially. + scheduler.writeStreams(builder1); + EXPECT_EQ( + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), + stream2); + + // Should write frames for stream2, stream3, followed by an empty write. + NiceMock builder2; + EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Return(4096)); + EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) { + builder2.frames_.push_back(f); + })); + auto& frames = builder2.frames_; + ASSERT_TRUE(scheduler.hasPendingData()); + scheduler.writeStreams(builder2); + ASSERT_EQ(frames.size(), 1); + ASSERT_TRUE(scheduler.hasPendingData()); + scheduler.writeStreams(builder2); + ASSERT_EQ(frames.size(), 2); + EXPECT_FALSE(scheduler.hasPendingData()); + scheduler.writeStreams(builder2); + WriteStreamFrame f1(stream2, 0, 9, false); + WriteStreamFrame f2(stream3, 0, 9, false); + ASSERT_TRUE(frames[0].asWriteStreamFrame()); + EXPECT_EQ(*frames[0].asWriteStreamFrame(), f1); + ASSERT_TRUE(frames[1].asWriteStreamFrame()); + EXPECT_EQ(*frames[1].asWriteStreamFrame(), f2); +} + TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) { QuicClientConnectionState conn( FizzClientQuicHandshakeContext::Builder().build()); @@ -1406,7 +1494,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) { // The default is to wraparound initially. scheduler.writeStreams(builder1); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( + conn.streamManager->writeQueue().getNextScheduledStream( Priority(0, false)), stream1); @@ -1473,7 +1561,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) { // The default is to wraparound initially. scheduler.writeStreams(builder1); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( + conn.streamManager->writeQueue().getNextScheduledStream( Priority(0, false)), stream1); @@ -1550,8 +1638,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) { // The default is to wraparound initially. scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), stream3); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); @@ -1578,8 +1665,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) { EXPECT_EQ(*frames[3].asWriteStreamFrame(), f4); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), stream3); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); } @@ -1605,8 +1691,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) { writeDataToQuicStream(*stream1, folly::IOBuf::copyBuffer("some data"), false); scheduler.writeStreams(builder); EXPECT_EQ( - conn.streamManager->writableStreams().getNextScheduledStream( - kDefaultPriority), + conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority), 0); } @@ -1644,7 +1729,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) { // Manually remove a stream and set the next scheduled to that stream. builder.frames_.clear(); - conn.streamManager->writableStreams().setNextScheduledStream(stream2); + conn.streamManager->writeQueue().setNextScheduledStream(stream2); conn.streamManager->removeWritable(*conn.streamManager->findStream(stream2)); scheduler.writeStreams(builder); ASSERT_EQ(builder.frames_.size(), 1); @@ -1817,7 +1902,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->retransmissionBuffer.clear(); conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); EXPECT_TRUE(scheduler.hasPendingData()); // Write again @@ -1862,7 +1946,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) { dsrStream->insertIntoLossBufMeta(bufMeta); conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*dsrStream); - conn.streamManager->updateLossStreams(*dsrStream); StreamFrameScheduler scheduler(conn); EXPECT_TRUE(scheduler.hasPendingData()); @@ -1931,7 +2014,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->retransmissionBuffer.clear(); conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); EXPECT_TRUE(scheduler.hasPendingData()); // Write again diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index d9bfe218f..a9a3b013c 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -165,7 +165,6 @@ void dropPackets(QuicServerConnectionState& conn) { std::move(*itr->second)); stream->retransmissionBuffer.erase(itr); conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); } } conn.outstandings.reset(); @@ -295,7 +294,6 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) { } while (curBuf != largeBuf.get()); lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false); conn.streamManager->updateWritableStreams(*lossStreamState); - conn.streamManager->updateLossStreams(*lossStreamState); transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0); @@ -4262,7 +4260,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) { conn.outstandings.reset(); // Start from stream2 instead of stream1 - conn.streamManager->writableStreams().setNextScheduledStream(s2); + conn.streamManager->writeQueue().setNextScheduledStream(s2); writableBytes = kDefaultUDPSendPacketLen - 100; EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); @@ -4287,7 +4285,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) { conn.outstandings.reset(); // Test wrap around - conn.streamManager->writableStreams().setNextScheduledStream(s2); + conn.streamManager->writeQueue().setNextScheduledStream(s2); writableBytes = kDefaultUDPSendPacketLen; EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); writeQuicDataToSocket( diff --git a/quic/common/test/TestUtils.cpp b/quic/common/test/TestUtils.cpp index 0ae47db1c..49bae9a0b 100644 --- a/quic/common/test/TestUtils.cpp +++ b/quic/common/test/TestUtils.cpp @@ -731,8 +731,8 @@ void overridePacketWithToken( } bool writableContains(QuicStreamManager& streamManager, StreamId streamId) { - return streamManager.writableStreams().count(streamId) > 0 || - streamManager.writableControlStreams().count(streamId) > 0; + return streamManager.writeQueue().count(streamId) > 0 || + streamManager.controlWriteQueue().count(streamId) > 0; } std::unique_ptr diff --git a/quic/dsr/backend/test/DSRPacketizerTest.cpp b/quic/dsr/backend/test/DSRPacketizerTest.cpp index c46e70fc8..809a94cb6 100644 --- a/quic/dsr/backend/test/DSRPacketizerTest.cpp +++ b/quic/dsr/backend/test/DSRPacketizerTest.cpp @@ -140,6 +140,11 @@ TEST_F(DSRMultiWriteTest, TwoRequestsWithLoss) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(1000); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto bufMetaStartingOffset = stream->writeBufMeta.offset; // Move part of the BufMetas to lossBufMetas auto split = stream->writeBufMeta.split(500); diff --git a/quic/dsr/frontend/Scheduler.cpp b/quic/dsr/frontend/Scheduler.cpp index 713ee74ca..446b93319 100644 --- a/quic/dsr/frontend/Scheduler.cpp +++ b/quic/dsr/frontend/Scheduler.cpp @@ -18,9 +18,34 @@ DSRStreamFrameScheduler::DSRStreamFrameScheduler( : conn_(conn) {} bool DSRStreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasDSRLoss() || - (conn_.streamManager->hasDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0); + return !nextStreamNonDsr_ && + (conn_.streamManager->hasDSRLoss() || + (conn_.streamManager->hasDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0)); +} + +DSRStreamFrameScheduler::SchedulingResult +DSRStreamFrameScheduler::enrichAndAddSendInstruction( + uint32_t encodedSize, + DSRStreamFrameScheduler::SchedulingResult result, + DSRPacketBuilderBase& packetBuilder, + SendInstruction::Builder& instructionBuilder, + const PriorityQueue& writeQueue, + const PriorityQueue::LevelItr& levelIter, + QuicStreamState& stream) { + enrichInstruction(instructionBuilder, stream); + packetBuilder.addSendInstruction( + instructionBuilder.build(), encodedSize, stream.streamPacketIdx++); + result.writeSuccess = true; + result.sender = stream.dsrSender.get(); + levelIter->iterator->next(); + auto nextStreamId = writeQueue.getNextScheduledStream(); + auto nextStream = + CHECK_NOTNULL(conn_.streamManager->findStream(nextStreamId)); + if (nextStream->hasSchedulableData()) { + nextStreamNonDsr_ = true; + } + return result; } /** @@ -32,22 +57,25 @@ bool DSRStreamFrameScheduler::hasPendingData() const { DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( DSRPacketBuilderBase& builder) { SchedulingResult result; - auto& writableDSRStreams = conn_.streamManager->writableDSRStreams(); + auto& writeQueue = conn_.streamManager->writeQueue(); const auto& levelIter = std::find_if( - writableDSRStreams.levels.cbegin(), - writableDSRStreams.levels.cend(), + writeQueue.levels.cbegin(), + writeQueue.levels.cend(), [&](const auto& level) { return !level.empty(); }); - if (levelIter == writableDSRStreams.levels.cend()) { + if (levelIter == writeQueue.levels.cend()) { return result; } levelIter->iterator->begin(); auto streamId = levelIter->iterator->current(); auto stream = conn_.streamManager->findStream(streamId); CHECK(stream); - CHECK(stream->dsrSender); + if (!stream->dsrSender || !stream->hasSchedulableDsr()) { + nextStreamNonDsr_ = true; + return result; + } bool hasFreshBufMeta = stream->writeBufMeta.length > 0; bool hasLossBufMeta = !stream->lossBufMetas.empty(); - CHECK(hasFreshBufMeta || hasLossBufMeta); + CHECK(stream->hasSchedulableDsr()); if (hasLossBufMeta) { SendInstruction::Builder instructionBuilder(conn_, streamId); auto encodedSize = writeDSRStreamFrame( @@ -64,15 +92,14 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( if (builder.remainingSpace() < encodedSize) { return result; } - enrichInstruction(instructionBuilder, *stream); - builder.addSendInstruction( - instructionBuilder.build(), - /*streamEncodedSize=*/encodedSize, - /*streamPacketIdx=*/stream->streamPacketIdx++); - result.writeSuccess = true; - result.sender = stream->dsrSender.get(); - levelIter->iterator->next(); - return result; + return enrichAndAddSendInstruction( + encodedSize, + std::move(result), + builder, + instructionBuilder, + writeQueue, + levelIter, + *stream); } } if (!hasFreshBufMeta || builder.remainingSpace() == 0) { @@ -109,13 +136,14 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( if (builder.remainingSpace() < encodedSize) { return result; } - enrichInstruction(instructionBuilder, *stream); - builder.addSendInstruction( - instructionBuilder.build(), encodedSize, stream->streamPacketIdx++); - result.writeSuccess = true; - result.sender = stream->dsrSender.get(); - levelIter->iterator->next(); - return result; + return enrichAndAddSendInstruction( + encodedSize, + std::move(result), + builder, + instructionBuilder, + writeQueue, + levelIter, + *stream); } return result; } diff --git a/quic/dsr/frontend/Scheduler.h b/quic/dsr/frontend/Scheduler.h index f31e2e003..4d875c15f 100644 --- a/quic/dsr/frontend/Scheduler.h +++ b/quic/dsr/frontend/Scheduler.h @@ -37,8 +37,17 @@ class DSRStreamFrameScheduler { void enrichInstruction( SendInstruction::Builder& builder, const QuicStreamState& stream); + SchedulingResult enrichAndAddSendInstruction( + uint32_t, + SchedulingResult, + DSRPacketBuilderBase&, + SendInstruction::Builder&, + const PriorityQueue&, + const PriorityQueue::LevelItr&, + QuicStreamState&); private: QuicServerConnectionState& conn_; + bool nextStreamNonDsr_{false}; }; } // namespace quic diff --git a/quic/dsr/frontend/test/WriteFunctionsTest.cpp b/quic/dsr/frontend/test/WriteFunctionsTest.cpp index 588d27c27..46e0f3927 100644 --- a/quic/dsr/frontend/test/WriteFunctionsTest.cpp +++ b/quic/dsr/frontend/test/WriteFunctionsTest.cpp @@ -69,6 +69,11 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimit) { auto streamId = prepareOneStream(3000); auto cid = getTestConnectionId(); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto currentBufMetaOffset = stream->writeBufMeta.offset; size_t packetLimit = 2; conn_.lossState.srtt = 100ms; @@ -96,6 +101,11 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimitNoLimit) { auto streamId = prepareOneStream(3000); auto cid = getTestConnectionId(); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto currentBufMetaOffset = stream->writeBufMeta.offset; size_t packetLimit = 2; conn_.lossState.srtt = 100ms; @@ -123,6 +133,11 @@ TEST_F(WriteFunctionsTest, WriteTwoInstructions) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(2000); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto cid = getTestConnectionId(); size_t packetLimit = 20; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); @@ -136,6 +151,11 @@ TEST_F(WriteFunctionsTest, PacketLimit) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(2000 * 100); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto mockCongestionController = std::make_unique>(); auto rawCongestionController = mockCongestionController.get(); @@ -157,20 +177,51 @@ TEST_F(WriteFunctionsTest, WriteTwoStreams) { auto streamId2 = prepareOneStream(1000); auto stream1 = conn_.streamManager->findStream(streamId1); auto stream2 = conn_.streamManager->findStream(streamId2); + // Pretend we sent the non DSR data on second stream + stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1); + stream2->currentWriteOffset = stream2->writeBuffer.chainLength(); + stream2->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream2); auto cid = getTestConnectionId(); size_t packetLimit = 20; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(1, stream1->retransmissionBufMetas.size()); EXPECT_EQ(1, stream2->retransmissionBufMetas.size()); - // TODO: This needs to be fixed later: The stream and the sender needs to be - // 1:1 in the future. Then there will be two senders for this test case and - // each of them will send out one instruction. EXPECT_EQ(1, countInstructions(streamId1)); EXPECT_EQ(1, countInstructions(streamId2)); EXPECT_EQ(2, conn_.outstandings.packets.size()); EXPECT_TRUE(verifyAllOutstandingsAreDSR()); } +TEST_F(WriteFunctionsTest, WriteThreeStreamsNonDsrAndDsr) { + prepareFlowControlAndStreamLimit(); + auto streamId1 = prepareOneStream(1000); + auto streamId2 = prepareOneStream(1000); + auto streamId3 = prepareOneStream(1000); + auto stream1 = conn_.streamManager->findStream(streamId1); + auto stream2 = conn_.streamManager->findStream(streamId2); + auto stream3 = conn_.streamManager->findStream(streamId3); + auto cid = getTestConnectionId(); + size_t packetLimit = 20; + // First loop only write a single packet because it will find there's non-DSR + // data to write on the next stream. + EXPECT_EQ(1, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); + // Pretend we sent the non DSR data for last stream + stream3->ackedIntervals.insert(0, stream3->writeBuffer.chainLength() - 1); + stream3->currentWriteOffset = stream3->writeBuffer.chainLength(); + stream3->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream3); + EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); + EXPECT_EQ(1, stream1->retransmissionBufMetas.size()); + EXPECT_EQ(1, stream2->retransmissionBufMetas.size()); + EXPECT_EQ(1, stream3->retransmissionBufMetas.size()); + EXPECT_EQ(1, countInstructions(streamId1)); + EXPECT_EQ(1, countInstructions(streamId2)); + EXPECT_EQ(1, countInstructions(streamId3)); + EXPECT_EQ(3, conn_.outstandings.packets.size()); + EXPECT_TRUE(verifyAllOutstandingsAreDSR()); +} + TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) { prepareFlowControlAndStreamLimit(); auto streamId1 = prepareOneStream(2000); @@ -179,6 +230,11 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) { auto stream2 = conn_.streamManager->findStream(streamId2); conn_.streamManager->setStreamPriority(streamId1, Priority{3, false}); conn_.streamManager->setStreamPriority(streamId2, Priority{3, false}); + // Pretend we sent the non DSR data on first stream + stream1->ackedIntervals.insert(0, stream1->writeBuffer.chainLength() - 1); + stream1->currentWriteOffset = stream1->writeBuffer.chainLength(); + stream1->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream1); auto cid = getTestConnectionId(); size_t packetLimit = 2; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); @@ -198,6 +254,11 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsIncremental) { auto stream2 = conn_.streamManager->findStream(streamId2); conn_.streamManager->setStreamPriority(streamId1, Priority{3, true}); conn_.streamManager->setStreamPriority(streamId2, Priority{3, true}); + // Pretend we sent the non DSR data on second stream + stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1); + stream2->currentWriteOffset = stream2->writeBuffer.chainLength(); + stream2->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream2); auto cid = getTestConnectionId(); size_t packetLimit = 2; EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); @@ -213,6 +274,11 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) { prepareFlowControlAndStreamLimit(); auto streamId = prepareOneStream(1000); auto stream = conn_.streamManager->findStream(streamId); + // Pretend we sent the non DSR data + stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); + stream->currentWriteOffset = stream->writeBuffer.chainLength(); + stream->writeBuffer.move(); + conn_.streamManager->updateWritableStreams(*stream); auto bufMetaStartingOffset = stream->writeBufMeta.offset; // Move part of the BufMetas to lossBufMetas auto split = stream->writeBufMeta.split(500); @@ -246,7 +312,7 @@ TEST_F( // Move part of the BufMetas to lossBufMetas auto split = stream->writeBufMeta.split(500); stream->lossBufMetas.push_back(split); - conn_.streamManager->updateLossStreams(*stream); + conn_.streamManager->updateWritableStreams(*stream); // Zero out conn flow control. conn_.flowControlState.sumCurWriteOffset = conn_.flowControlState.peerAdvertisedMaxOffset; diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index 9bdc91148..e8c93245c 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -200,7 +200,6 @@ void markPacketLoss( stream->retransmissionBufMetas.erase(retxBufMetaItr); } conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->updateLossStreams(*stream); break; } case QuicWriteFrame::Type::WriteCryptoFrame: { diff --git a/quic/loss/test/QuicLossFunctionsTest.cpp b/quic/loss/test/QuicLossFunctionsTest.cpp index dad7b6ad7..a0831e48e 100644 --- a/quic/loss/test/QuicLossFunctionsTest.cpp +++ b/quic/loss/test/QuicLossFunctionsTest.cpp @@ -2348,7 +2348,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { ASSERT_EQ(200, retxBufMetaIter->second.length); ASSERT_FALSE(retxBufMetaIter->second.eof); conn->streamManager->updateWritableStreams(*stream); - conn->streamManager->updateLossStreams(*stream); EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); @@ -2370,7 +2369,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { ASSERT_EQ(400, retxBufMetaIter->second.length); ASSERT_FALSE(retxBufMetaIter->second.eof); conn->streamManager->updateWritableStreams(*stream); - conn->streamManager->updateLossStreams(*stream); EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); @@ -2392,7 +2390,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { ASSERT_EQ(400, retxBufMetaIter->second.length); ASSERT_TRUE(retxBufMetaIter->second.eof); conn->streamManager->updateWritableStreams(*stream); - conn->streamManager->updateLossStreams(*stream); EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->writableDSRStreams().empty()); @@ -2411,7 +2408,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { EXPECT_EQ(2, stream->retransmissionBufMetas.size()); EXPECT_TRUE(conn->streamManager->hasLoss()); ASSERT_EQ(stream->streamLossCount, 1); - EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); + EXPECT_FALSE(stream->hasWritableBufMeta()); + EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); + EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); // Lose the 3rd dsr packet: RegularQuicWritePacket packet3(PacketHeader(ShortHeader( @@ -2428,7 +2427,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { EXPECT_EQ(1, stream->retransmissionBufMetas.size()); ASSERT_EQ(stream->streamLossCount, 2); EXPECT_TRUE(conn->streamManager->hasLoss()); - EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); + EXPECT_FALSE(stream->hasWritableBufMeta()); + EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); + EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); // Lose the 3nd dsr packet, it should be merged together with the first // element in the lossBufMetas: @@ -2446,7 +2447,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { EXPECT_EQ(0, stream->retransmissionBufMetas.size()); ASSERT_EQ(stream->streamLossCount, 3); EXPECT_TRUE(conn->streamManager->hasLoss()); - EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); + EXPECT_FALSE(stream->hasWritableBufMeta()); + EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); + EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); } TEST_F(QuicLossFunctionsTest, TestReorderingThresholdDSRNormal) { diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 3a916eb4e..9c6d061bb 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -325,13 +325,8 @@ void QuicServerTransport::writeData() { } if (conn_->oneRttWriteCipher) { CHECK(conn_->oneRttWriteHeaderCipher); - // This kind of sucks, but right now we don't have a way to schedule DSR - // and non-DSR streams as part of the same priority queue. This can lead - // to prioritity inversions and starving of one kind of stream over the - // other. To mitigate this, randomly decide whether to write DSR or non - // DSR data first which at least ensures fairness over time. auto writeLoopBeginTime = Clock::now(); - auto nonDsrPath = [&]() { + auto nonDsrPath = [&](auto limit) { return writeQuicDataToSocket( *socket_, *conn_, @@ -340,33 +335,34 @@ void QuicServerTransport::writeData() { *conn_->oneRttWriteCipher, *conn_->oneRttWriteHeaderCipher, version, - packetLimit, + limit, writeLoopBeginTime); }; - auto dsrPath = [&]() { + auto dsrPath = [&](auto limit) { return writePacketizationRequest( *serverConn_, destConnId, - packetLimit, + limit, *conn_->oneRttWriteCipher, writeLoopBeginTime); }; - if (folly::Random::oneIn(2)) { - if (packetLimit && congestionControlWritableBytes(*serverConn_)) { - packetLimit -= dsrPath(); - } - if (packetLimit) { - packetLimit -= nonDsrPath().packetsWritten; - } - } else { - auto written = nonDsrPath(); + // We need a while loop because both paths write streams from the same + // queue, which can result in empty writes. + while (packetLimit) { + auto startingPacketLimit = packetLimit; + // Give the non-DSR path a chance. + auto written = nonDsrPath(packetLimit); // If we didn't write much from the non DSR path, don't penalize DSR // a full packet. if (written.bytesWritten >= conn_->udpSendPacketLen / 2) { packetLimit -= written.packetsWritten; } if (packetLimit && congestionControlWritableBytes(*serverConn_)) { - packetLimit -= dsrPath(); + packetLimit -= dsrPath(packetLimit); + } + if (startingPacketLimit == packetLimit) { + // We haven't written anything with either path, so we're done. + break; } } } diff --git a/quic/state/AckHandlers.cpp b/quic/state/AckHandlers.cpp index 57c867d4b..d57180956 100644 --- a/quic/state/AckHandlers.cpp +++ b/quic/state/AckHandlers.cpp @@ -170,7 +170,6 @@ AckEvent processAckFrame( streamFrame->offset, streamFrame->len, streamFrame->fin); stream->updateAckedIntervals( streamFrame->offset, streamFrame->len, streamFrame->fin); - conn.streamManager->updateLossStreams(*stream); conn.streamManager->updateWritableStreams(*stream); } } diff --git a/quic/state/QuicPriorityQueue.h b/quic/state/QuicPriorityQueue.h index 2159c9979..d910f0707 100644 --- a/quic/state/QuicPriorityQueue.h +++ b/quic/state/QuicPriorityQueue.h @@ -157,6 +157,7 @@ struct PriorityQueue { folly::F14FastMap streamToOrderId; }; std::vector levels; + using LevelItr = decltype(levels)::const_iterator; PriorityQueue() : levels(kDefaultPriorityLevelsSize) { for (size_t index = 0; index < levels.size(); index++) { @@ -258,6 +259,18 @@ struct PriorityQueue { return level.iterator->nextStreamIt->streamId; } + FOLLY_NODISCARD StreamId getNextScheduledStream() const { + const auto& levelIter = + std::find_if(levels.cbegin(), levels.cend(), [&](const auto& level) { + return !level.empty(); + }); + // The expectation is that calling this function on an empty queue is + // a bug. + CHECK(levelIter != levels.cend()); + levelIter->iterator->begin(); + return levelIter->iterator->current(); + } + private: folly::F14FastMap writableStreamsToLevel_; using WSIterator = decltype(writableStreamsToLevel_)::iterator; diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index 9b64da63b..b4ccf8267 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -246,10 +246,7 @@ bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) { } notifyStreamPriorityChanges(); } - // If this stream is already in the writable or loss queues, update the - // priority there. - writableStreams_.updateIfExist(id, stream->priority); - writableDSRStreams_.updateIfExist(id, stream->priority); + writeQueue_.updateIfExist(id, stream->priority); return true; } return false; @@ -560,10 +557,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { DCHECK(it->second.inTerminalStates()); readableStreams_.erase(streamId); peekableStreams_.erase(streamId); - writableStreams_.erase(streamId); - writableDSRStreams_.erase(streamId); - writableControlStreams_.erase(streamId); - removeLoss(streamId); + removeWritable(it->second); blockedStreams_.erase(streamId); deliverableStreams_.erase(streamId); txStreams_.erase(streamId); @@ -644,22 +638,49 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { CHECK(stream.lossBuffer.empty()); CHECK(stream.lossBufMetas.empty()); removeWritable(stream); - removeDSRWritable(stream); + writableStreams_.erase(stream.id); + writableDSRStreams_.erase(stream.id); + lossStreams_.erase(stream.id); + lossDSRStreams_.erase(stream.id); + if (stream.isControl) { + controlWriteQueue_.erase(stream.id); + } else { + writeQueue_.erase(stream.id); + } return; } - if (stream.hasWritableData() || !stream.lossBuffer.empty()) { - addWritable(stream); + if (stream.hasWritableData()) { + writableStreams_.emplace(stream.id); } else { - removeWritable(stream); + writableStreams_.erase(stream.id); } - if (stream.isControl) { - return; - } - if (stream.dsrSender && - (stream.hasWritableBufMeta() || !stream.lossBufMetas.empty())) { - addDSRWritable(stream); + if (stream.hasWritableBufMeta()) { + writableDSRStreams_.emplace(stream.id); } else { - removeDSRWritable(stream); + writableDSRStreams_.erase(stream.id); + } + if (!stream.lossBuffer.empty()) { + lossStreams_.emplace(stream.id); + } else { + lossStreams_.erase(stream.id); + } + if (!stream.lossBufMetas.empty()) { + lossDSRStreams_.emplace(stream.id); + } else { + lossDSRStreams_.erase(stream.id); + } + if (stream.hasSchedulableData() || stream.hasSchedulableDsr()) { + if (stream.isControl) { + controlWriteQueue_.emplace(stream.id); + } else { + writeQueue_.insertOrUpdate(stream.id, stream.priority); + } + } else { + if (stream.isControl) { + controlWriteQueue_.erase(stream.id); + } else { + writeQueue_.erase(stream.id); + } } } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 1f3ea9738..e2642bd30 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -130,9 +130,10 @@ class QuicStreamManager { lossDSRStreams_ = std::move(other.lossDSRStreams_); readableStreams_ = std::move(other.readableStreams_); peekableStreams_ = std::move(other.peekableStreams_); + writeQueue_ = std::move(other.writeQueue_); + controlWriteQueue_ = std::move(other.controlWriteQueue_); writableStreams_ = std::move(other.writableStreams_); writableDSRStreams_ = std::move(other.writableDSRStreams_); - writableControlStreams_ = std::move(other.writableControlStreams_); txStreams_ = std::move(other.txStreams_); deliverableStreams_ = std::move(other.deliverableStreams_); closedStreams_ = std::move(other.closedStreams_); @@ -385,19 +386,6 @@ class QuicStreamManager { lossStreams_.insert(id); } - void updateLossStreams(const QuicStreamState& stream) { - if (!stream.hasLoss()) { - removeLoss(stream.id); - } else { - if (!stream.lossBuffer.empty()) { - lossStreams_.emplace(stream.id); - } - if (!stream.lossBufMetas.empty()) { - lossDSRStreams_.emplace(stream.id); - } - } - } - /** * Update stream priority if the stream indicated by id exists, and the * passed in values are different from current priority. Return true if @@ -423,16 +411,19 @@ class QuicStreamManager { * Returns a mutable reference to the container holding the writable stream * IDs. */ - auto& writableControlStreams() { - return writableControlStreams_; + auto& controlWriteQueue() { + return controlWriteQueue_; + } + + auto& writeQueue() { + return writeQueue_; } /* * Returns if there are any writable streams. */ bool hasWritable() const { - return !writableStreams_.empty() || !writableDSRStreams_.empty() || - !writableControlStreams_.empty(); + return !writeQueue_.empty() || !controlWriteQueue_.empty(); } FOLLY_NODISCARD bool hasDSRWritable() const { @@ -440,7 +431,7 @@ class QuicStreamManager { } bool hasNonDSRWritable() const { - return !writableStreams_.empty() || !writableControlStreams_.empty(); + return !writableStreams_.empty() || !controlWriteQueue_.empty(); } /* @@ -448,17 +439,26 @@ class QuicStreamManager { */ void addWritable(const QuicStreamState& stream) { if (stream.isControl) { - writableControlStreams_.insert(stream.id); + // Control streams get their own queue. + CHECK(stream.hasSchedulableData()); + controlWriteQueue_.insert(stream.id); } else { - CHECK(stream.hasWritableData() || !stream.lossBuffer.empty()); - writableStreams_.insertOrUpdate(stream.id, stream.priority); + CHECK(stream.hasSchedulableData() || stream.hasSchedulableDsr()); + writeQueue_.insertOrUpdate(stream.id, stream.priority); + } + if (stream.hasWritableData()) { + writableStreams_.insert(stream.id); + } + if (stream.hasWritableBufMeta()) { + LOG(ERROR) << "writable DSR: " << stream.id; + writableDSRStreams_.insert(stream.id); + } + if (!stream.lossBuffer.empty()) { + lossStreams_.insert(stream.id); + } + if (!stream.lossBufMetas.empty()) { + lossDSRStreams_.insert(stream.id); } - } - - void addDSRWritable(const QuicStreamState& stream) { - CHECK(!stream.isControl); - CHECK(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty()); - writableDSRStreams_.insertOrUpdate(stream.id, stream.priority); } /* @@ -466,15 +466,14 @@ class QuicStreamManager { */ void removeWritable(const QuicStreamState& stream) { if (stream.isControl) { - writableControlStreams_.erase(stream.id); + controlWriteQueue_.erase(stream.id); } else { - writableStreams_.erase(stream.id); + writeQueue_.erase(stream.id); } - } - - void removeDSRWritable(const QuicStreamState& stream) { - CHECK(!stream.isControl); + writableStreams_.erase(stream.id); writableDSRStreams_.erase(stream.id); + lossStreams_.erase(stream.id); + lossDSRStreams_.erase(stream.id); } /* @@ -483,7 +482,8 @@ class QuicStreamManager { void clearWritable() { writableStreams_.clear(); writableDSRStreams_.clear(); - writableControlStreams_.clear(); + writeQueue_.clear(); + controlWriteQueue_.clear(); } /* @@ -1166,12 +1166,14 @@ class QuicStreamManager { // Set of streams that have pending peeks folly::F14FastSet peekableStreams_; - // Set of !control streams that have writable data - PriorityQueue writableStreams_; - PriorityQueue writableDSRStreams_; + // Set of !control streams that have writable data used for frame scheduling + PriorityQueue writeQueue_; // Set of control streams that have writable data - std::set writableControlStreams_; + std::set controlWriteQueue_; + + folly::F14FastSet writableStreams_; + folly::F14FastSet writableDSRStreams_; // Streams that may be able to call TxCallback folly::F14FastSet txStreams_; diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 022df6dc4..3379f429c 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -411,6 +411,15 @@ struct QuicStreamState : public QuicStreamLike { return false; } + // Whether this stream has non-DSR data in the write buffer or loss buffer. + FOLLY_NODISCARD bool hasSchedulableData() const { + return hasWritableData() || !lossBuffer.empty(); + } + + FOLLY_NODISCARD bool hasSchedulableDsr() const { + return hasWritableBufMeta() || !lossBufMetas.empty(); + } + FOLLY_NODISCARD bool hasWritableBufMeta() const { if (writeBufMeta.offset == 0) { return false; diff --git a/quic/state/stream/StreamStateFunctions.cpp b/quic/state/stream/StreamStateFunctions.cpp index f343f6f3f..7c7bc9bb4 100644 --- a/quic/state/stream/StreamStateFunctions.cpp +++ b/quic/state/stream/StreamStateFunctions.cpp @@ -27,7 +27,6 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) { } stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->updateLossStreams(stream); } void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) { @@ -63,7 +62,6 @@ void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) { } stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->updateLossStreams(stream); QUIC_STATS(stream.conn.statsCallback, onQuicStreamReset, frame.errorCode); } diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index a30ed9a9b..626042d83 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -2236,14 +2236,14 @@ TEST_F(QuicStreamFunctionsTest, LossBufferEmpty) { StreamId id = 4; QuicStreamState stream(id, conn); conn.streamManager->addLoss(id); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_FALSE(conn.streamManager->hasLoss()); } TEST_F(QuicStreamFunctionsTest, LossBufferEmptyNoChange) { StreamId id = 4; QuicStreamState stream(id, conn); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_FALSE(conn.streamManager->hasLoss()); } @@ -2251,7 +2251,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferHasData) { StreamId id = 4; QuicStreamState stream(id, conn); stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2263,7 +2263,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferMetaHasData) { b.setOffset(10); b.setEOF(false); stream.lossBufMetas.emplace_back(b.build()); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2276,7 +2276,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferMetaStillHasData) { b.setOffset(10); b.setEOF(false); stream.lossBufMetas.emplace_back(b.build()); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2285,7 +2285,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferStillHasData) { QuicStreamState stream(id, conn); conn.streamManager->addLoss(id); stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); - conn.streamManager->updateLossStreams(stream); + conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); }