diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index 2b171aae1..e3f802ba4 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -512,7 +512,7 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { } // namespace quic bool StreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasLoss() || + return conn_.streamManager->hasNonDSRLoss() || (conn_.streamManager->hasNonDSRWritable() && getSendConnFlowControlBytesWire(conn_) > 0); } diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 340261adf..cc69c2fe0 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1844,6 +1844,53 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); } +TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) { + QuicServerConnectionState conn( + FizzServerQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 1000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000; + + auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id; + auto dsrStream = conn.streamManager->createNextBidirectionalStream().value(); + auto stream = conn.streamManager->findStream(streamId); + auto data = buildRandomInputData(1000); + writeDataToQuicStream(*stream, std::move(data), true); + WriteBufferMeta bufMeta{}; + bufMeta.offset = 0; + bufMeta.length = 100; + bufMeta.eof = false; + dsrStream->insertIntoLossBufMeta(bufMeta); + conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->updateWritableStreams(*dsrStream); + conn.streamManager->updateLossStreams(*dsrStream); + + StreamFrameScheduler scheduler(conn); + EXPECT_TRUE(scheduler.hasPendingData()); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + getTestConnectionId(), + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder1( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder1.encodePacketHeader(); + scheduler.writeStreams(builder1); + auto packet1 = std::move(builder1).buildPacket().packet; + updateConnection( + conn, folly::none, packet1, Clock::now(), 1000, 0, false /* isDSR */); + EXPECT_EQ(1, packet1.frames.size()); + auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); + EXPECT_EQ(streamId, writeStreamFrame1.streamId); + EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); + EXPECT_EQ(0, stream->writeBuffer.chainLength()); + EXPECT_EQ(1, stream->retransmissionBuffer.size()); + EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); + + EXPECT_FALSE(scheduler.hasPendingData()); +} + TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { QuicServerConnectionState conn( FizzServerQuicHandshakeContext::Builder().build()); diff --git a/quic/dsr/frontend/Scheduler.cpp b/quic/dsr/frontend/Scheduler.cpp index 656321738..9d0f8d875 100644 --- a/quic/dsr/frontend/Scheduler.cpp +++ b/quic/dsr/frontend/Scheduler.cpp @@ -18,8 +18,9 @@ DSRStreamFrameScheduler::DSRStreamFrameScheduler( : conn_(conn) {} bool DSRStreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0; + return conn_.streamManager->hasDSRLoss() || + (conn_.streamManager->hasDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0); } /** diff --git a/quic/dsr/frontend/test/WriteFunctionsTest.cpp b/quic/dsr/frontend/test/WriteFunctionsTest.cpp index b007f9e15..336296a64 100644 --- a/quic/dsr/frontend/test/WriteFunctionsTest.cpp +++ b/quic/dsr/frontend/test/WriteFunctionsTest.cpp @@ -170,4 +170,33 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) { EXPECT_EQ(expectedSecondFrame, *packet2.frames[0].asWriteStreamFrame()); } +TEST_F( + WriteFunctionsTest, + LossAndFreshTwoInstructionsInTwoPacketsNoFlowControl) { + prepareFlowControlAndStreamLimit(); + auto streamId = prepareOneStream(1000); + auto stream = conn_.streamManager->findStream(streamId); + auto bufMetaStartingOffset = stream->writeBufMeta.offset; + // Move part of the BufMetas to lossBufMetas + auto split = stream->writeBufMeta.split(500); + stream->lossBufMetas.push_back(split); + conn_.streamManager->updateLossStreams(*stream); + // Zero out conn flow control. + conn_.flowControlState.sumCurWriteOffset = + conn_.flowControlState.peerAdvertisedMaxOffset; + size_t packetLimit = 10; + // Should only write lost data + EXPECT_EQ( + 1, + writePacketizationRequest( + conn_, getTestConnectionId(), packetLimit, *aead_)); + EXPECT_EQ(1, countInstructions(streamId)); + ASSERT_EQ(1, conn_.outstandings.packets.size()); + auto& packet1 = conn_.outstandings.packets.front().packet; + EXPECT_EQ(1, packet1.frames.size()); + WriteStreamFrame expectedFirstFrame( + streamId, bufMetaStartingOffset, 500, false, true); + EXPECT_EQ(expectedFirstFrame, *packet1.frames[0].asWriteStreamFrame()); +} + } // namespace quic::test diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index a21ac03c5..3a26bf300 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -161,7 +161,7 @@ void markPacketLoss( stream->retransmissionBufMetas.erase(retxBufMetaItr); } conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->addLoss(stream->id); + conn.streamManager->updateLossStreams(*stream); break; } case QuicWriteFrame::Type::WriteCryptoFrame: { diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 899e152b5..1c12e4299 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -127,6 +127,7 @@ class QuicStreamManager { windowUpdates_ = std::move(other.windowUpdates_); flowControlUpdated_ = std::move(other.flowControlUpdated_); lossStreams_ = std::move(other.lossStreams_); + lossDSRStreams_ = std::move(other.lossDSRStreams_); readableStreams_ = std::move(other.readableStreams_); peekableStreams_ = std::move(other.peekableStreams_); writableStreams_ = std::move(other.writableStreams_); @@ -358,14 +359,28 @@ class QuicStreamManager { } } + // Considers _any_ type of stream data being lost. FOLLY_NODISCARD bool hasLoss() const { + return !lossStreams_.empty() || !lossDSRStreams_.empty(); + } + + // Considers non-DSR data being lost. + FOLLY_NODISCARD bool hasNonDSRLoss() const { return !lossStreams_.empty(); } - void removeLoss(StreamId id) { - lossStreams_.erase(id); + // Considers non-DSR data being lost. + FOLLY_NODISCARD bool hasDSRLoss() const { + return !lossDSRStreams_.empty(); } + // Should only used directly by tests. + void removeLoss(StreamId id) { + lossStreams_.erase(id); + lossDSRStreams_.erase(id); + } + + // Should only used directly by tests. void addLoss(StreamId id) { lossStreams_.insert(id); } @@ -374,7 +389,12 @@ class QuicStreamManager { if (!stream.hasLoss()) { removeLoss(stream.id); } else { - addLoss(stream.id); + if (!stream.lossBuffer.empty()) { + lossStreams_.emplace(stream.id); + } + if (!stream.lossBufMetas.empty()) { + lossDSRStreams_.emplace(stream.id); + } } } @@ -1137,6 +1157,9 @@ class QuicStreamManager { // Streams that have bytes in loss buffer folly::F14FastSet lossStreams_; + // DSR Streams that have bytes in loss buff meta + folly::F14FastSet lossDSRStreams_; + // Set of streams that have pending reads folly::F14FastSet readableStreams_; diff --git a/quic/state/stream/StreamStateFunctions.cpp b/quic/state/stream/StreamStateFunctions.cpp index a549fdc4d..f343f6f3f 100644 --- a/quic/state/stream/StreamStateFunctions.cpp +++ b/quic/state/stream/StreamStateFunctions.cpp @@ -27,7 +27,7 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) { } stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->removeLoss(stream.id); + stream.conn.streamManager->updateLossStreams(stream); } void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {