mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-07-29 03:41:11 +03:00
Separately track streams by DSR and non DSR data lost.
Summary: We need to do this to maintain consistency with the schedulers. Prior to this fix when we ran out of connection flow control and had lost DSR data the non-DSR stream scheduler would do an empty write _and_ the DSR stream scheduler would do an empty write. To fully resolve the issue we need to track the streams separately (note that the same stream can be in both sets). Reviewed By: jbeshay Differential Revision: D40003361 fbshipit-source-id: fd3e567bc8a2bc8b71d4c4543894053fa6e24dc4
This commit is contained in:
committed by
Facebook GitHub Bot
parent
8d38c19348
commit
03e314a3a4
@ -512,7 +512,7 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
|
||||
} // namespace quic
|
||||
|
||||
bool StreamFrameScheduler::hasPendingData() const {
|
||||
return conn_.streamManager->hasLoss() ||
|
||||
return conn_.streamManager->hasNonDSRLoss() ||
|
||||
(conn_.streamManager->hasNonDSRWritable() &&
|
||||
getSendConnFlowControlBytesWire(conn_) > 0);
|
||||
}
|
||||
|
@ -1844,6 +1844,53 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) {
|
||||
EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength());
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) {
|
||||
QuicServerConnectionState conn(
|
||||
FizzServerQuicHandshakeContext::Builder().build());
|
||||
conn.streamManager->setMaxLocalBidirectionalStreams(10);
|
||||
conn.flowControlState.peerAdvertisedMaxOffset = 1000;
|
||||
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000;
|
||||
|
||||
auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id;
|
||||
auto dsrStream = conn.streamManager->createNextBidirectionalStream().value();
|
||||
auto stream = conn.streamManager->findStream(streamId);
|
||||
auto data = buildRandomInputData(1000);
|
||||
writeDataToQuicStream(*stream, std::move(data), true);
|
||||
WriteBufferMeta bufMeta{};
|
||||
bufMeta.offset = 0;
|
||||
bufMeta.length = 100;
|
||||
bufMeta.eof = false;
|
||||
dsrStream->insertIntoLossBufMeta(bufMeta);
|
||||
conn.streamManager->updateWritableStreams(*stream);
|
||||
conn.streamManager->updateWritableStreams(*dsrStream);
|
||||
conn.streamManager->updateLossStreams(*dsrStream);
|
||||
|
||||
StreamFrameScheduler scheduler(conn);
|
||||
EXPECT_TRUE(scheduler.hasPendingData());
|
||||
ShortHeader shortHeader1(
|
||||
ProtectionType::KeyPhaseZero,
|
||||
getTestConnectionId(),
|
||||
getNextPacketNum(conn, PacketNumberSpace::AppData));
|
||||
RegularQuicPacketBuilder builder1(
|
||||
conn.udpSendPacketLen,
|
||||
std::move(shortHeader1),
|
||||
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
|
||||
builder1.encodePacketHeader();
|
||||
scheduler.writeStreams(builder1);
|
||||
auto packet1 = std::move(builder1).buildPacket().packet;
|
||||
updateConnection(
|
||||
conn, folly::none, packet1, Clock::now(), 1000, 0, false /* isDSR */);
|
||||
EXPECT_EQ(1, packet1.frames.size());
|
||||
auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame();
|
||||
EXPECT_EQ(streamId, writeStreamFrame1.streamId);
|
||||
EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn));
|
||||
EXPECT_EQ(0, stream->writeBuffer.chainLength());
|
||||
EXPECT_EQ(1, stream->retransmissionBuffer.size());
|
||||
EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength());
|
||||
|
||||
EXPECT_FALSE(scheduler.hasPendingData());
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
|
||||
QuicServerConnectionState conn(
|
||||
FizzServerQuicHandshakeContext::Builder().build());
|
||||
|
@ -18,8 +18,9 @@ DSRStreamFrameScheduler::DSRStreamFrameScheduler(
|
||||
: conn_(conn) {}
|
||||
|
||||
bool DSRStreamFrameScheduler::hasPendingData() const {
|
||||
return conn_.streamManager->hasDSRWritable() &&
|
||||
getSendConnFlowControlBytesWire(conn_) > 0;
|
||||
return conn_.streamManager->hasDSRLoss() ||
|
||||
(conn_.streamManager->hasDSRWritable() &&
|
||||
getSendConnFlowControlBytesWire(conn_) > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -170,4 +170,33 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) {
|
||||
EXPECT_EQ(expectedSecondFrame, *packet2.frames[0].asWriteStreamFrame());
|
||||
}
|
||||
|
||||
TEST_F(
|
||||
WriteFunctionsTest,
|
||||
LossAndFreshTwoInstructionsInTwoPacketsNoFlowControl) {
|
||||
prepareFlowControlAndStreamLimit();
|
||||
auto streamId = prepareOneStream(1000);
|
||||
auto stream = conn_.streamManager->findStream(streamId);
|
||||
auto bufMetaStartingOffset = stream->writeBufMeta.offset;
|
||||
// Move part of the BufMetas to lossBufMetas
|
||||
auto split = stream->writeBufMeta.split(500);
|
||||
stream->lossBufMetas.push_back(split);
|
||||
conn_.streamManager->updateLossStreams(*stream);
|
||||
// Zero out conn flow control.
|
||||
conn_.flowControlState.sumCurWriteOffset =
|
||||
conn_.flowControlState.peerAdvertisedMaxOffset;
|
||||
size_t packetLimit = 10;
|
||||
// Should only write lost data
|
||||
EXPECT_EQ(
|
||||
1,
|
||||
writePacketizationRequest(
|
||||
conn_, getTestConnectionId(), packetLimit, *aead_));
|
||||
EXPECT_EQ(1, countInstructions(streamId));
|
||||
ASSERT_EQ(1, conn_.outstandings.packets.size());
|
||||
auto& packet1 = conn_.outstandings.packets.front().packet;
|
||||
EXPECT_EQ(1, packet1.frames.size());
|
||||
WriteStreamFrame expectedFirstFrame(
|
||||
streamId, bufMetaStartingOffset, 500, false, true);
|
||||
EXPECT_EQ(expectedFirstFrame, *packet1.frames[0].asWriteStreamFrame());
|
||||
}
|
||||
|
||||
} // namespace quic::test
|
||||
|
@ -161,7 +161,7 @@ void markPacketLoss(
|
||||
stream->retransmissionBufMetas.erase(retxBufMetaItr);
|
||||
}
|
||||
conn.streamManager->updateWritableStreams(*stream);
|
||||
conn.streamManager->addLoss(stream->id);
|
||||
conn.streamManager->updateLossStreams(*stream);
|
||||
break;
|
||||
}
|
||||
case QuicWriteFrame::Type::WriteCryptoFrame: {
|
||||
|
@ -127,6 +127,7 @@ class QuicStreamManager {
|
||||
windowUpdates_ = std::move(other.windowUpdates_);
|
||||
flowControlUpdated_ = std::move(other.flowControlUpdated_);
|
||||
lossStreams_ = std::move(other.lossStreams_);
|
||||
lossDSRStreams_ = std::move(other.lossDSRStreams_);
|
||||
readableStreams_ = std::move(other.readableStreams_);
|
||||
peekableStreams_ = std::move(other.peekableStreams_);
|
||||
writableStreams_ = std::move(other.writableStreams_);
|
||||
@ -358,14 +359,28 @@ class QuicStreamManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Considers _any_ type of stream data being lost.
|
||||
FOLLY_NODISCARD bool hasLoss() const {
|
||||
return !lossStreams_.empty() || !lossDSRStreams_.empty();
|
||||
}
|
||||
|
||||
// Considers non-DSR data being lost.
|
||||
FOLLY_NODISCARD bool hasNonDSRLoss() const {
|
||||
return !lossStreams_.empty();
|
||||
}
|
||||
|
||||
void removeLoss(StreamId id) {
|
||||
lossStreams_.erase(id);
|
||||
// Considers non-DSR data being lost.
|
||||
FOLLY_NODISCARD bool hasDSRLoss() const {
|
||||
return !lossDSRStreams_.empty();
|
||||
}
|
||||
|
||||
// Should only used directly by tests.
|
||||
void removeLoss(StreamId id) {
|
||||
lossStreams_.erase(id);
|
||||
lossDSRStreams_.erase(id);
|
||||
}
|
||||
|
||||
// Should only used directly by tests.
|
||||
void addLoss(StreamId id) {
|
||||
lossStreams_.insert(id);
|
||||
}
|
||||
@ -374,7 +389,12 @@ class QuicStreamManager {
|
||||
if (!stream.hasLoss()) {
|
||||
removeLoss(stream.id);
|
||||
} else {
|
||||
addLoss(stream.id);
|
||||
if (!stream.lossBuffer.empty()) {
|
||||
lossStreams_.emplace(stream.id);
|
||||
}
|
||||
if (!stream.lossBufMetas.empty()) {
|
||||
lossDSRStreams_.emplace(stream.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1137,6 +1157,9 @@ class QuicStreamManager {
|
||||
// Streams that have bytes in loss buffer
|
||||
folly::F14FastSet<StreamId> lossStreams_;
|
||||
|
||||
// DSR Streams that have bytes in loss buff meta
|
||||
folly::F14FastSet<StreamId> lossDSRStreams_;
|
||||
|
||||
// Set of streams that have pending reads
|
||||
folly::F14FastSet<StreamId> readableStreams_;
|
||||
|
||||
|
@ -27,7 +27,7 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) {
|
||||
}
|
||||
stream.conn.streamManager->updateReadableStreams(stream);
|
||||
stream.conn.streamManager->updateWritableStreams(stream);
|
||||
stream.conn.streamManager->removeLoss(stream.id);
|
||||
stream.conn.streamManager->updateLossStreams(stream);
|
||||
}
|
||||
|
||||
void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {
|
||||
|
Reference in New Issue
Block a user