From 03e314a3a4857bf8fd8d8a8ac85995e772943a2d Mon Sep 17 00:00:00 2001 From: Matt Joras Date: Mon, 3 Oct 2022 17:02:52 -0700 Subject: [PATCH] Separately track streams by DSR and non DSR data lost. Summary: We need to do this to maintain consistency with the schedulers. Prior to this fix when we ran out of connection flow control and had lost DSR data the non-DSR stream scheduler would do an empty write _and_ the DSR stream scheduler would do an empty write. To fully resolve the issue we need to track the streams separately (note that the same stream can be in both sets). Reviewed By: jbeshay Differential Revision: D40003361 fbshipit-source-id: fd3e567bc8a2bc8b71d4c4543894053fa6e24dc4 --- quic/api/QuicPacketScheduler.cpp | 2 +- quic/api/test/QuicPacketSchedulerTest.cpp | 47 +++++++++++++++++++ quic/dsr/frontend/Scheduler.cpp | 5 +- quic/dsr/frontend/test/WriteFunctionsTest.cpp | 29 ++++++++++++ quic/loss/QuicLossFunctions.cpp | 2 +- quic/state/QuicStreamManager.h | 29 ++++++++++-- quic/state/stream/StreamStateFunctions.cpp | 2 +- 7 files changed, 108 insertions(+), 8 deletions(-) diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index 2b171aae1..e3f802ba4 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -512,7 +512,7 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { } // namespace quic bool StreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasLoss() || + return conn_.streamManager->hasNonDSRLoss() || (conn_.streamManager->hasNonDSRWritable() && getSendConnFlowControlBytesWire(conn_) > 0); } diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 340261adf..cc69c2fe0 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1844,6 +1844,53 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); } +TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) { + QuicServerConnectionState conn( + FizzServerQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 1000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000; + + auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id; + auto dsrStream = conn.streamManager->createNextBidirectionalStream().value(); + auto stream = conn.streamManager->findStream(streamId); + auto data = buildRandomInputData(1000); + writeDataToQuicStream(*stream, std::move(data), true); + WriteBufferMeta bufMeta{}; + bufMeta.offset = 0; + bufMeta.length = 100; + bufMeta.eof = false; + dsrStream->insertIntoLossBufMeta(bufMeta); + conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->updateWritableStreams(*dsrStream); + conn.streamManager->updateLossStreams(*dsrStream); + + StreamFrameScheduler scheduler(conn); + EXPECT_TRUE(scheduler.hasPendingData()); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + getTestConnectionId(), + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder1( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder1.encodePacketHeader(); + scheduler.writeStreams(builder1); + auto packet1 = std::move(builder1).buildPacket().packet; + updateConnection( + conn, folly::none, packet1, Clock::now(), 1000, 0, false /* isDSR */); + EXPECT_EQ(1, packet1.frames.size()); + auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); + EXPECT_EQ(streamId, writeStreamFrame1.streamId); + EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); + EXPECT_EQ(0, stream->writeBuffer.chainLength()); + EXPECT_EQ(1, stream->retransmissionBuffer.size()); + EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); + + EXPECT_FALSE(scheduler.hasPendingData()); +} + TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { QuicServerConnectionState conn( FizzServerQuicHandshakeContext::Builder().build()); diff --git a/quic/dsr/frontend/Scheduler.cpp b/quic/dsr/frontend/Scheduler.cpp index 656321738..9d0f8d875 100644 --- a/quic/dsr/frontend/Scheduler.cpp +++ b/quic/dsr/frontend/Scheduler.cpp @@ -18,8 +18,9 @@ DSRStreamFrameScheduler::DSRStreamFrameScheduler( : conn_(conn) {} bool DSRStreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0; + return conn_.streamManager->hasDSRLoss() || + (conn_.streamManager->hasDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0); } /** diff --git a/quic/dsr/frontend/test/WriteFunctionsTest.cpp b/quic/dsr/frontend/test/WriteFunctionsTest.cpp index b007f9e15..336296a64 100644 --- a/quic/dsr/frontend/test/WriteFunctionsTest.cpp +++ b/quic/dsr/frontend/test/WriteFunctionsTest.cpp @@ -170,4 +170,33 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) { EXPECT_EQ(expectedSecondFrame, *packet2.frames[0].asWriteStreamFrame()); } +TEST_F( + WriteFunctionsTest, + LossAndFreshTwoInstructionsInTwoPacketsNoFlowControl) { + prepareFlowControlAndStreamLimit(); + auto streamId = prepareOneStream(1000); + auto stream = conn_.streamManager->findStream(streamId); + auto bufMetaStartingOffset = stream->writeBufMeta.offset; + // Move part of the BufMetas to lossBufMetas + auto split = stream->writeBufMeta.split(500); + stream->lossBufMetas.push_back(split); + conn_.streamManager->updateLossStreams(*stream); + // Zero out conn flow control. + conn_.flowControlState.sumCurWriteOffset = + conn_.flowControlState.peerAdvertisedMaxOffset; + size_t packetLimit = 10; + // Should only write lost data + EXPECT_EQ( + 1, + writePacketizationRequest( + conn_, getTestConnectionId(), packetLimit, *aead_)); + EXPECT_EQ(1, countInstructions(streamId)); + ASSERT_EQ(1, conn_.outstandings.packets.size()); + auto& packet1 = conn_.outstandings.packets.front().packet; + EXPECT_EQ(1, packet1.frames.size()); + WriteStreamFrame expectedFirstFrame( + streamId, bufMetaStartingOffset, 500, false, true); + EXPECT_EQ(expectedFirstFrame, *packet1.frames[0].asWriteStreamFrame()); +} + } // namespace quic::test diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index a21ac03c5..3a26bf300 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -161,7 +161,7 @@ void markPacketLoss( stream->retransmissionBufMetas.erase(retxBufMetaItr); } conn.streamManager->updateWritableStreams(*stream); - conn.streamManager->addLoss(stream->id); + conn.streamManager->updateLossStreams(*stream); break; } case QuicWriteFrame::Type::WriteCryptoFrame: { diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 899e152b5..1c12e4299 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -127,6 +127,7 @@ class QuicStreamManager { windowUpdates_ = std::move(other.windowUpdates_); flowControlUpdated_ = std::move(other.flowControlUpdated_); lossStreams_ = std::move(other.lossStreams_); + lossDSRStreams_ = std::move(other.lossDSRStreams_); readableStreams_ = std::move(other.readableStreams_); peekableStreams_ = std::move(other.peekableStreams_); writableStreams_ = std::move(other.writableStreams_); @@ -358,14 +359,28 @@ class QuicStreamManager { } } + // Considers _any_ type of stream data being lost. FOLLY_NODISCARD bool hasLoss() const { + return !lossStreams_.empty() || !lossDSRStreams_.empty(); + } + + // Considers non-DSR data being lost. + FOLLY_NODISCARD bool hasNonDSRLoss() const { return !lossStreams_.empty(); } - void removeLoss(StreamId id) { - lossStreams_.erase(id); + // Considers non-DSR data being lost. + FOLLY_NODISCARD bool hasDSRLoss() const { + return !lossDSRStreams_.empty(); } + // Should only used directly by tests. + void removeLoss(StreamId id) { + lossStreams_.erase(id); + lossDSRStreams_.erase(id); + } + + // Should only used directly by tests. void addLoss(StreamId id) { lossStreams_.insert(id); } @@ -374,7 +389,12 @@ class QuicStreamManager { if (!stream.hasLoss()) { removeLoss(stream.id); } else { - addLoss(stream.id); + if (!stream.lossBuffer.empty()) { + lossStreams_.emplace(stream.id); + } + if (!stream.lossBufMetas.empty()) { + lossDSRStreams_.emplace(stream.id); + } } } @@ -1137,6 +1157,9 @@ class QuicStreamManager { // Streams that have bytes in loss buffer folly::F14FastSet lossStreams_; + // DSR Streams that have bytes in loss buff meta + folly::F14FastSet lossDSRStreams_; + // Set of streams that have pending reads folly::F14FastSet readableStreams_; diff --git a/quic/state/stream/StreamStateFunctions.cpp b/quic/state/stream/StreamStateFunctions.cpp index a549fdc4d..f343f6f3f 100644 --- a/quic/state/stream/StreamStateFunctions.cpp +++ b/quic/state/stream/StreamStateFunctions.cpp @@ -27,7 +27,7 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) { } stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->removeLoss(stream.id); + stream.conn.streamManager->updateLossStreams(stream); } void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {