diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index d5508f5b7..5fce23eb1 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -433,7 +433,7 @@ void StreamFrameScheduler::writeStreamsHelper( } else { // walk the sequential streams in order until we run out of space for (auto streamIt = level.streams.begin(); - streamIt != level.streams.end() && connWritableBytes > 0; + streamIt != level.streams.end(); ++streamIt) { auto stream = conn_.streamManager->findStream(*streamIt); CHECK(stream); @@ -473,8 +473,9 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { } // namespace quic bool StreamFrameScheduler::hasPendingData() const { - return conn_.streamManager->hasNonDSRWritable() && - getSendConnFlowControlBytesWire(conn_) > 0; + return conn_.streamManager->hasLoss() || + (conn_.streamManager->hasNonDSRWritable() && + getSendConnFlowControlBytesWire(conn_) > 0); } bool StreamFrameScheduler::writeStreamFrame( diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 6479ff625..2fa874787 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -1609,8 +1609,10 @@ WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) { if (conn.streamManager->hasBlocked()) { return WriteDataReason::BLOCKED; } - if (getSendConnFlowControlBytesWire(conn) != 0 && - conn.streamManager->hasWritable()) { + // If we have lost data or flow control + stream data. + if (conn.streamManager->hasLoss() || + (getSendConnFlowControlBytesWire(conn) != 0 && + conn.streamManager->hasWritable())) { return WriteDataReason::STREAM; } if (!conn.pendingEvents.frames.empty()) { diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 4552ee672..dc68a6795 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1674,6 +1674,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { conn.streamManager->updateWritableStreams(*stream); StreamFrameScheduler scheduler(conn); + EXPECT_TRUE(scheduler.hasPendingData()); ShortHeader shortHeader1( ProtectionType::KeyPhaseZero, getTestConnectionId(), @@ -1699,6 +1700,75 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->retransmissionBuffer.clear(); conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->updateLossStreams(*stream); + EXPECT_TRUE(scheduler.hasPendingData()); + + // Write again + ShortHeader shortHeader2( + ProtectionType::KeyPhaseZero, + getTestConnectionId(), + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder2( + conn.udpSendPacketLen, + std::move(shortHeader2), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder2.encodePacketHeader(); + scheduler.writeStreams(builder2); + auto packet2 = std::move(builder2).buildPacket().packet; + updateConnection( + conn, folly::none, packet2, Clock::now(), 1000, 0, false /* isDSR */); + EXPECT_EQ(1, packet2.frames.size()); + auto& writeStreamFrame2 = *packet2.frames[0].asWriteStreamFrame(); + EXPECT_EQ(streamId, writeStreamFrame2.streamId); + EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); + EXPECT_TRUE(stream->lossBuffer.empty()); + EXPECT_EQ(1, stream->retransmissionBuffer.size()); + EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); +} + +TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { + QuicServerConnectionState conn( + FizzServerQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 1000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000; + + auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id; + conn.streamManager->setStreamPriority(streamId, 0, false); + auto stream = conn.streamManager->findStream(streamId); + auto data = buildRandomInputData(1000); + writeDataToQuicStream(*stream, std::move(data), true); + conn.streamManager->updateWritableStreams(*stream); + + StreamFrameScheduler scheduler(conn); + EXPECT_TRUE(scheduler.hasPendingData()); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + getTestConnectionId(), + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder1( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder1.encodePacketHeader(); + scheduler.writeStreams(builder1); + auto packet1 = std::move(builder1).buildPacket().packet; + updateConnection( + conn, folly::none, packet1, Clock::now(), 1000, 0, false /* isDSR */); + EXPECT_EQ(1, packet1.frames.size()); + auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); + EXPECT_EQ(streamId, writeStreamFrame1.streamId); + EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); + EXPECT_EQ(0, stream->writeBuffer.chainLength()); + EXPECT_EQ(1, stream->retransmissionBuffer.size()); + EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); + + // Move the bytes to loss buffer: + stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); + stream->retransmissionBuffer.clear(); + conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->updateLossStreams(*stream); + EXPECT_TRUE(scheduler.hasPendingData()); // Write again ShortHeader shortHeader2( diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index 4ffa03cca..b0a2af603 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -3451,6 +3451,24 @@ TEST_F(QuicTransportFunctionsTest, ShouldWriteDataNoConnFlowControl) { EXPECT_EQ(WriteDataReason::NO_WRITE, shouldWriteData(*conn)); } +TEST_F(QuicTransportFunctionsTest, ShouldWriteDataNoConnFlowControlLoss) { + auto conn = createConn(); + conn->oneRttWriteCipher = test::createNoOpAead(); + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + EXPECT_CALL(*rawCongestionController, getWritableBytes()) + .WillRepeatedly(Return(1500)); + auto stream1 = conn->streamManager->createNextBidirectionalStream().value(); + auto buf = IOBuf::copyBuffer("0123456789"); + writeDataToQuicStream(*stream1, buf->clone(), false); + EXPECT_NE(WriteDataReason::NO_WRITE, shouldWriteData(*conn)); + // Artificially limit the connection flow control. + conn->streamManager->addLoss(stream1->id); + conn->flowControlState.peerAdvertisedMaxOffset = 0; + EXPECT_NE(WriteDataReason::NO_WRITE, shouldWriteData(*conn)); +} + TEST_F(QuicTransportFunctionsTest, HasAckDataToWriteCipherAndAckStateMatch) { auto conn = createConn(); EXPECT_FALSE(hasAckDataToWrite(*conn)); diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 0fa7e0647..cc6327227 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -270,7 +270,17 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) { auto stream = transport_->createBidirectionalStream().value(); auto lossStream = transport_->createBidirectionalStream().value(); - conn.streamManager->addLoss(lossStream); + auto lossStreamState = conn.streamManager->findStream(lossStream); + ASSERT_TRUE(lossStreamState); + auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 20, 4096); + auto curBuf = largeBuf.get(); + do { + curBuf->append(curBuf->capacity()); + curBuf = curBuf->next(); + } while (curBuf != largeBuf.get()); + lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false); + conn.streamManager->updateWritableStreams(*lossStreamState); + conn.streamManager->updateLossStreams(*lossStreamState); transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);