mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-09 20:42:44 +03:00
If there's lost data, we have data to write, take 2.
Summary: This is a bug that could prevent us from writing data if we ran out of connection flow control while we had lost data. The last attempt missed a mistake in the scheduling of sequential priority streams. Reviewed By: kvtsoy Differential Revision: D33030784 fbshipit-source-id: e1b82234346a604875a9ffe9ab7bc5fb398450ed
This commit is contained in:
committed by
Facebook GitHub Bot
parent
8cde858b62
commit
ca4705af0b
@@ -433,7 +433,7 @@ void StreamFrameScheduler::writeStreamsHelper(
|
|||||||
} else {
|
} else {
|
||||||
// walk the sequential streams in order until we run out of space
|
// walk the sequential streams in order until we run out of space
|
||||||
for (auto streamIt = level.streams.begin();
|
for (auto streamIt = level.streams.begin();
|
||||||
streamIt != level.streams.end() && connWritableBytes > 0;
|
streamIt != level.streams.end();
|
||||||
++streamIt) {
|
++streamIt) {
|
||||||
auto stream = conn_.streamManager->findStream(*streamIt);
|
auto stream = conn_.streamManager->findStream(*streamIt);
|
||||||
CHECK(stream);
|
CHECK(stream);
|
||||||
@@ -473,8 +473,9 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
|
|||||||
} // namespace quic
|
} // namespace quic
|
||||||
|
|
||||||
bool StreamFrameScheduler::hasPendingData() const {
|
bool StreamFrameScheduler::hasPendingData() const {
|
||||||
return conn_.streamManager->hasNonDSRWritable() &&
|
return conn_.streamManager->hasLoss() ||
|
||||||
getSendConnFlowControlBytesWire(conn_) > 0;
|
(conn_.streamManager->hasNonDSRWritable() &&
|
||||||
|
getSendConnFlowControlBytesWire(conn_) > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StreamFrameScheduler::writeStreamFrame(
|
bool StreamFrameScheduler::writeStreamFrame(
|
||||||
|
@@ -1609,8 +1609,10 @@ WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) {
|
|||||||
if (conn.streamManager->hasBlocked()) {
|
if (conn.streamManager->hasBlocked()) {
|
||||||
return WriteDataReason::BLOCKED;
|
return WriteDataReason::BLOCKED;
|
||||||
}
|
}
|
||||||
if (getSendConnFlowControlBytesWire(conn) != 0 &&
|
// If we have lost data or flow control + stream data.
|
||||||
conn.streamManager->hasWritable()) {
|
if (conn.streamManager->hasLoss() ||
|
||||||
|
(getSendConnFlowControlBytesWire(conn) != 0 &&
|
||||||
|
conn.streamManager->hasWritable())) {
|
||||||
return WriteDataReason::STREAM;
|
return WriteDataReason::STREAM;
|
||||||
}
|
}
|
||||||
if (!conn.pendingEvents.frames.empty()) {
|
if (!conn.pendingEvents.frames.empty()) {
|
||||||
|
@@ -1674,6 +1674,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) {
|
|||||||
conn.streamManager->updateWritableStreams(*stream);
|
conn.streamManager->updateWritableStreams(*stream);
|
||||||
|
|
||||||
StreamFrameScheduler scheduler(conn);
|
StreamFrameScheduler scheduler(conn);
|
||||||
|
EXPECT_TRUE(scheduler.hasPendingData());
|
||||||
ShortHeader shortHeader1(
|
ShortHeader shortHeader1(
|
||||||
ProtectionType::KeyPhaseZero,
|
ProtectionType::KeyPhaseZero,
|
||||||
getTestConnectionId(),
|
getTestConnectionId(),
|
||||||
@@ -1699,6 +1700,75 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) {
|
|||||||
stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0]));
|
stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0]));
|
||||||
stream->retransmissionBuffer.clear();
|
stream->retransmissionBuffer.clear();
|
||||||
conn.streamManager->updateWritableStreams(*stream);
|
conn.streamManager->updateWritableStreams(*stream);
|
||||||
|
conn.streamManager->updateLossStreams(*stream);
|
||||||
|
EXPECT_TRUE(scheduler.hasPendingData());
|
||||||
|
|
||||||
|
// Write again
|
||||||
|
ShortHeader shortHeader2(
|
||||||
|
ProtectionType::KeyPhaseZero,
|
||||||
|
getTestConnectionId(),
|
||||||
|
getNextPacketNum(conn, PacketNumberSpace::AppData));
|
||||||
|
RegularQuicPacketBuilder builder2(
|
||||||
|
conn.udpSendPacketLen,
|
||||||
|
std::move(shortHeader2),
|
||||||
|
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
|
||||||
|
builder2.encodePacketHeader();
|
||||||
|
scheduler.writeStreams(builder2);
|
||||||
|
auto packet2 = std::move(builder2).buildPacket().packet;
|
||||||
|
updateConnection(
|
||||||
|
conn, folly::none, packet2, Clock::now(), 1000, 0, false /* isDSR */);
|
||||||
|
EXPECT_EQ(1, packet2.frames.size());
|
||||||
|
auto& writeStreamFrame2 = *packet2.frames[0].asWriteStreamFrame();
|
||||||
|
EXPECT_EQ(streamId, writeStreamFrame2.streamId);
|
||||||
|
EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn));
|
||||||
|
EXPECT_TRUE(stream->lossBuffer.empty());
|
||||||
|
EXPECT_EQ(1, stream->retransmissionBuffer.size());
|
||||||
|
EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
|
||||||
|
QuicServerConnectionState conn(
|
||||||
|
FizzServerQuicHandshakeContext::Builder().build());
|
||||||
|
conn.streamManager->setMaxLocalBidirectionalStreams(10);
|
||||||
|
conn.flowControlState.peerAdvertisedMaxOffset = 1000;
|
||||||
|
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000;
|
||||||
|
|
||||||
|
auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id;
|
||||||
|
conn.streamManager->setStreamPriority(streamId, 0, false);
|
||||||
|
auto stream = conn.streamManager->findStream(streamId);
|
||||||
|
auto data = buildRandomInputData(1000);
|
||||||
|
writeDataToQuicStream(*stream, std::move(data), true);
|
||||||
|
conn.streamManager->updateWritableStreams(*stream);
|
||||||
|
|
||||||
|
StreamFrameScheduler scheduler(conn);
|
||||||
|
EXPECT_TRUE(scheduler.hasPendingData());
|
||||||
|
ShortHeader shortHeader1(
|
||||||
|
ProtectionType::KeyPhaseZero,
|
||||||
|
getTestConnectionId(),
|
||||||
|
getNextPacketNum(conn, PacketNumberSpace::AppData));
|
||||||
|
RegularQuicPacketBuilder builder1(
|
||||||
|
conn.udpSendPacketLen,
|
||||||
|
std::move(shortHeader1),
|
||||||
|
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
|
||||||
|
builder1.encodePacketHeader();
|
||||||
|
scheduler.writeStreams(builder1);
|
||||||
|
auto packet1 = std::move(builder1).buildPacket().packet;
|
||||||
|
updateConnection(
|
||||||
|
conn, folly::none, packet1, Clock::now(), 1000, 0, false /* isDSR */);
|
||||||
|
EXPECT_EQ(1, packet1.frames.size());
|
||||||
|
auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame();
|
||||||
|
EXPECT_EQ(streamId, writeStreamFrame1.streamId);
|
||||||
|
EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn));
|
||||||
|
EXPECT_EQ(0, stream->writeBuffer.chainLength());
|
||||||
|
EXPECT_EQ(1, stream->retransmissionBuffer.size());
|
||||||
|
EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength());
|
||||||
|
|
||||||
|
// Move the bytes to loss buffer:
|
||||||
|
stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0]));
|
||||||
|
stream->retransmissionBuffer.clear();
|
||||||
|
conn.streamManager->updateWritableStreams(*stream);
|
||||||
|
conn.streamManager->updateLossStreams(*stream);
|
||||||
|
EXPECT_TRUE(scheduler.hasPendingData());
|
||||||
|
|
||||||
// Write again
|
// Write again
|
||||||
ShortHeader shortHeader2(
|
ShortHeader shortHeader2(
|
||||||
|
@@ -3451,6 +3451,24 @@ TEST_F(QuicTransportFunctionsTest, ShouldWriteDataNoConnFlowControl) {
|
|||||||
EXPECT_EQ(WriteDataReason::NO_WRITE, shouldWriteData(*conn));
|
EXPECT_EQ(WriteDataReason::NO_WRITE, shouldWriteData(*conn));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(QuicTransportFunctionsTest, ShouldWriteDataNoConnFlowControlLoss) {
|
||||||
|
auto conn = createConn();
|
||||||
|
conn->oneRttWriteCipher = test::createNoOpAead();
|
||||||
|
auto mockCongestionController =
|
||||||
|
std::make_unique<NiceMock<MockCongestionController>>();
|
||||||
|
auto rawCongestionController = mockCongestionController.get();
|
||||||
|
EXPECT_CALL(*rawCongestionController, getWritableBytes())
|
||||||
|
.WillRepeatedly(Return(1500));
|
||||||
|
auto stream1 = conn->streamManager->createNextBidirectionalStream().value();
|
||||||
|
auto buf = IOBuf::copyBuffer("0123456789");
|
||||||
|
writeDataToQuicStream(*stream1, buf->clone(), false);
|
||||||
|
EXPECT_NE(WriteDataReason::NO_WRITE, shouldWriteData(*conn));
|
||||||
|
// Artificially limit the connection flow control.
|
||||||
|
conn->streamManager->addLoss(stream1->id);
|
||||||
|
conn->flowControlState.peerAdvertisedMaxOffset = 0;
|
||||||
|
EXPECT_NE(WriteDataReason::NO_WRITE, shouldWriteData(*conn));
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(QuicTransportFunctionsTest, HasAckDataToWriteCipherAndAckStateMatch) {
|
TEST_F(QuicTransportFunctionsTest, HasAckDataToWriteCipherAndAckStateMatch) {
|
||||||
auto conn = createConn();
|
auto conn = createConn();
|
||||||
EXPECT_FALSE(hasAckDataToWrite(*conn));
|
EXPECT_FALSE(hasAckDataToWrite(*conn));
|
||||||
|
@@ -270,7 +270,17 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) {
|
|||||||
|
|
||||||
auto stream = transport_->createBidirectionalStream().value();
|
auto stream = transport_->createBidirectionalStream().value();
|
||||||
auto lossStream = transport_->createBidirectionalStream().value();
|
auto lossStream = transport_->createBidirectionalStream().value();
|
||||||
conn.streamManager->addLoss(lossStream);
|
auto lossStreamState = conn.streamManager->findStream(lossStream);
|
||||||
|
ASSERT_TRUE(lossStreamState);
|
||||||
|
auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 20, 4096);
|
||||||
|
auto curBuf = largeBuf.get();
|
||||||
|
do {
|
||||||
|
curBuf->append(curBuf->capacity());
|
||||||
|
curBuf = curBuf->next();
|
||||||
|
} while (curBuf != largeBuf.get());
|
||||||
|
lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false);
|
||||||
|
conn.streamManager->updateWritableStreams(*lossStreamState);
|
||||||
|
conn.streamManager->updateLossStreams(*lossStreamState);
|
||||||
transport_->writeChain(
|
transport_->writeChain(
|
||||||
stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
|
stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
|
||||||
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);
|
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);
|
||||||
|
Reference in New Issue
Block a user