mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
QUIC flow control accounts for DSR bytes
Summary: This was completely missing previously, which led to Client quickly shutting down a connection with flow control violation when server oversends in DSR mode. Reviewed By: mjoras Differential Revision: D27940953 fbshipit-source-id: 5644c1a3da5217365df9de33258bb5b071ff8187
This commit is contained in:
committed by
Facebook GitHub Bot
parent
5590b9f573
commit
62b0d8b879
@@ -481,8 +481,10 @@ bool StreamFrameScheduler::writeStreamFrame(
|
||||
uint64_t flowControlLen =
|
||||
std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes);
|
||||
uint64_t bufferLen = stream.writeBuffer.chainLength();
|
||||
bool canWriteFin =
|
||||
stream.finalWriteOffset.has_value() && bufferLen <= flowControlLen;
|
||||
// We can't write FIN directly from here if writeBufMeta has pending bytes to
|
||||
// send.
|
||||
bool canWriteFin = stream.finalWriteOffset.has_value() &&
|
||||
bufferLen <= flowControlLen && stream.writeBufMeta.length == 0;
|
||||
auto dataLen = writeStreamFrameHeader(
|
||||
builder,
|
||||
stream.id,
|
||||
|
@@ -16,6 +16,8 @@
|
||||
#include <quic/codec/QuicPacketBuilder.h>
|
||||
#include <quic/codec/test/Mocks.h>
|
||||
#include <quic/common/test/TestUtils.h>
|
||||
#include <quic/dsr/Types.h>
|
||||
#include <quic/dsr/test/Mocks.h>
|
||||
#include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h>
|
||||
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
|
||||
#include <quic/server/state/ServerStateMachine.h>
|
||||
@@ -1739,6 +1741,39 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) {
|
||||
EXPECT_EQ(200, stream2->retransmissionBuffer[0]->data.chainLength());
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, NoFinWhenThereIsPendingWriteBuf) {
|
||||
QuicServerConnectionState conn(
|
||||
FizzServerQuicHandshakeContext::Builder().build());
|
||||
conn.streamManager->setMaxLocalBidirectionalStreams(10);
|
||||
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
|
||||
auto* stream = *(conn.streamManager->createNextBidirectionalStream());
|
||||
stream->flowControlState.peerAdvertisedMaxOffset = 100000;
|
||||
|
||||
writeDataToQuicStream(*stream, folly::IOBuf::copyBuffer("Ascent"), false);
|
||||
stream->dsrSender = std::make_unique<MockDSRPacketizationRequestSender>();
|
||||
BufferMeta bufferMeta(5000);
|
||||
writeBufMetaToQuicStream(*stream, bufferMeta, true);
|
||||
EXPECT_TRUE(stream->finalWriteOffset.hasValue());
|
||||
PacketNum packetNum = 0;
|
||||
ShortHeader header(
|
||||
ProtectionType::KeyPhaseOne,
|
||||
conn.clientConnectionId.value_or(getTestConnectionId()),
|
||||
packetNum);
|
||||
RegularQuicPacketBuilder builder(
|
||||
conn.udpSendPacketLen,
|
||||
std::move(header),
|
||||
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
|
||||
builder.encodePacketHeader();
|
||||
StreamFrameScheduler scheduler(conn);
|
||||
scheduler.writeStreams(builder);
|
||||
auto packet = std::move(builder).buildPacket().packet;
|
||||
EXPECT_EQ(1, packet.frames.size());
|
||||
auto streamFrame = *packet.frames[0].asWriteStreamFrame();
|
||||
EXPECT_EQ(streamFrame.len, 6);
|
||||
EXPECT_EQ(streamFrame.offset, 0);
|
||||
EXPECT_FALSE(streamFrame.fin);
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
QuicPacketSchedulerTests,
|
||||
QuicPacketSchedulerTest,
|
||||
|
@@ -71,12 +71,21 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream(
|
||||
if (!hasFreshBufMeta || builder.remainingSpace() == 0) {
|
||||
return result;
|
||||
}
|
||||
// If we have fresh BufMeta to write, the offset cannot be 0. This is based on
|
||||
// the current limit that some real data has to be written into the stream
|
||||
// before BufMetas.
|
||||
CHECK_NE(stream->writeBufMeta.offset, 0);
|
||||
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
|
||||
if (connWritableBytes == 0) {
|
||||
return result;
|
||||
}
|
||||
auto flowControlLen =
|
||||
std::min(getSendStreamFlowControlBytesWire(*stream), connWritableBytes);
|
||||
// When stream still has writeBuffer, getSendStreamFlowControlBytesWire counts
|
||||
// from currentWriteOffset which isn't right for BufMetas.
|
||||
auto streamFlowControlLen = std::min(
|
||||
getSendStreamFlowControlBytesWire(*stream),
|
||||
stream->flowControlState.peerAdvertisedMaxOffset -
|
||||
stream->writeBufMeta.offset);
|
||||
auto flowControlLen = std::min(streamFlowControlLen, connWritableBytes);
|
||||
bool canWriteFin = stream->finalWriteOffset.has_value() &&
|
||||
stream->writeBufMeta.length <= flowControlLen;
|
||||
SendInstruction::Builder instructionBuilder(conn_, *streamId);
|
||||
|
@@ -32,6 +32,7 @@ TEST_F(SchedulerTest, ScheduleStream) {
|
||||
DSRStreamFrameScheduler scheduler(conn_);
|
||||
EXPECT_FALSE(scheduler.hasPendingData());
|
||||
auto stream = *conn_.streamManager->createNextBidirectionalStream();
|
||||
stream->flowControlState.peerAdvertisedMaxOffset = 200;
|
||||
stream->dsrSender = std::make_unique<MockDSRPacketizationRequestSender>();
|
||||
writeDataToQuicStream(
|
||||
*stream, folly::IOBuf::copyBuffer("New York Bagles"), false);
|
||||
@@ -43,22 +44,51 @@ TEST_F(SchedulerTest, ScheduleStream) {
|
||||
conn_.streamManager->hasDSRWritable());
|
||||
EXPECT_TRUE(scheduler.hasPendingData());
|
||||
EXPECT_CALL(builder_, remainingSpaceNonConst()).WillRepeatedly(Return(1000));
|
||||
uint64_t writtenLength = 0;
|
||||
EXPECT_CALL(builder_, addSendInstruction(_, _))
|
||||
.WillOnce(Invoke([&](SendInstruction&& instruction, uint32_t) {
|
||||
EXPECT_EQ(stream->id, (size_t)instruction.streamId);
|
||||
EXPECT_EQ(expectedBufMetaOffset, instruction.offset);
|
||||
EXPECT_EQ(200, instruction.len);
|
||||
EXPECT_TRUE(instruction.fin);
|
||||
EXPECT_GT(200, instruction.len);
|
||||
writtenLength = instruction.len;
|
||||
EXPECT_FALSE(instruction.fin);
|
||||
}));
|
||||
EXPECT_TRUE(scheduler.writeStream(builder_).writeSuccess);
|
||||
|
||||
auto writtenMeta = stream->writeBufMeta.split(200);
|
||||
EXPECT_EQ(0, stream->writeBufMeta.length);
|
||||
++stream->writeBufMeta.offset;
|
||||
auto writtenMeta = stream->writeBufMeta.split(writtenLength);
|
||||
auto nextExpectedOffset = stream->writeBufMeta.offset;
|
||||
EXPECT_GT(stream->writeBufMeta.length, 0);
|
||||
stream->retransmissionBufMetas.emplace(
|
||||
std::piecewise_construct,
|
||||
std::forward_as_tuple(expectedBufMetaOffset),
|
||||
std::forward_as_tuple(writtenMeta));
|
||||
// This is now flow control blocked:
|
||||
EXPECT_FALSE(stream->hasWritableBufMeta());
|
||||
conn_.streamManager->updateWritableStreams(*stream);
|
||||
EXPECT_FALSE(conn_.streamManager->hasDSRWritable());
|
||||
EXPECT_TRUE(conn_.streamManager->writableDSRStreams().empty());
|
||||
|
||||
stream->flowControlState.peerAdvertisedMaxOffset = 500;
|
||||
conn_.streamManager->updateWritableStreams(*stream);
|
||||
EXPECT_TRUE(conn_.streamManager->hasDSRWritable());
|
||||
EXPECT_FALSE(conn_.streamManager->writableDSRStreams().empty());
|
||||
EXPECT_CALL(builder_, addSendInstruction(_, _))
|
||||
.WillOnce(Invoke([&](SendInstruction&& instruction, uint32_t) {
|
||||
EXPECT_EQ(stream->id, (size_t)instruction.streamId);
|
||||
EXPECT_EQ(nextExpectedOffset, instruction.offset);
|
||||
EXPECT_GT(instruction.len, 0);
|
||||
writtenLength = instruction.len;
|
||||
EXPECT_TRUE(instruction.fin);
|
||||
}));
|
||||
EXPECT_TRUE(scheduler.writeStream(builder_).writeSuccess);
|
||||
|
||||
auto nextWrittenMeta = stream->writeBufMeta.split(writtenLength);
|
||||
EXPECT_EQ(stream->writeBufMeta.length, 0);
|
||||
stream->writeBufMeta.offset++;
|
||||
stream->retransmissionBufMetas.emplace(
|
||||
std::piecewise_construct,
|
||||
std::forward_as_tuple(nextExpectedOffset),
|
||||
std::forward_as_tuple(nextWrittenMeta));
|
||||
EXPECT_FALSE(stream->hasWritableBufMeta());
|
||||
conn_.streamManager->updateWritableStreams(*stream);
|
||||
EXPECT_FALSE(conn_.streamManager->hasDSRWritable());
|
||||
|
@@ -217,7 +217,7 @@ void updateFlowControlOnResetStream(QuicStreamState& stream) {
|
||||
void maybeWriteBlockAfterAPIWrite(QuicStreamState& stream) {
|
||||
// Only write blocked when stream becomes blocked
|
||||
if (getSendStreamFlowControlBytesWire(stream) == 0 &&
|
||||
stream.writeBuffer.empty()) {
|
||||
stream.writeBuffer.empty() && stream.writeBufMeta.length == 0) {
|
||||
stream.conn.streamManager->queueBlocked(
|
||||
stream.id, stream.flowControlState.peerAdvertisedMaxOffset);
|
||||
if (stream.conn.qLogger) {
|
||||
@@ -244,12 +244,14 @@ void maybeWriteDataBlockedAfterSocketWrite(QuicConnectionStateBase& conn) {
|
||||
void maybeWriteBlockAfterSocketWrite(QuicStreamState& stream) {
|
||||
// Only write blocked when the flow control bytes are used up and there are
|
||||
// still pending data
|
||||
if (stream.finalWriteOffset &&
|
||||
*stream.finalWriteOffset < stream.currentWriteOffset) {
|
||||
if (stream.streamWriteError) {
|
||||
return;
|
||||
}
|
||||
if (stream.finalWriteOffset && stream.hasSentFIN()) {
|
||||
return;
|
||||
}
|
||||
if (getSendStreamFlowControlBytesWire(stream) == 0 &&
|
||||
!stream.writeBuffer.empty()) {
|
||||
(!stream.writeBuffer.empty() || stream.writeBufMeta.length > 0)) {
|
||||
stream.conn.streamManager->queueBlocked(
|
||||
stream.id, stream.flowControlState.peerAdvertisedMaxOffset);
|
||||
if (stream.conn.qLogger) {
|
||||
@@ -273,7 +275,8 @@ void handleStreamWindowUpdate(
|
||||
if (stream.flowControlState.peerAdvertisedMaxOffset <= maximumData) {
|
||||
stream.flowControlState.peerAdvertisedMaxOffset = maximumData;
|
||||
if (stream.flowControlState.peerAdvertisedMaxOffset >
|
||||
stream.currentWriteOffset + stream.writeBuffer.chainLength()) {
|
||||
stream.currentWriteOffset + stream.writeBuffer.chainLength() +
|
||||
stream.writeBufMeta.length) {
|
||||
updateFlowControlList(stream);
|
||||
}
|
||||
stream.conn.streamManager->updateWritableStreams(stream);
|
||||
@@ -323,14 +326,15 @@ void handleStreamBlocked(QuicStreamState& stream) {
|
||||
uint64_t getSendStreamFlowControlBytesWire(const QuicStreamState& stream) {
|
||||
DCHECK_GE(
|
||||
stream.flowControlState.peerAdvertisedMaxOffset,
|
||||
stream.currentWriteOffset);
|
||||
stream.nextOffsetToWrite());
|
||||
return stream.flowControlState.peerAdvertisedMaxOffset -
|
||||
stream.currentWriteOffset;
|
||||
stream.nextOffsetToWrite();
|
||||
}
|
||||
|
||||
uint64_t getSendStreamFlowControlBytesAPI(const QuicStreamState& stream) {
|
||||
auto sendFlowControlBytes = getSendStreamFlowControlBytesWire(stream);
|
||||
auto dataInBuffer = stream.writeBuffer.chainLength();
|
||||
auto dataInBuffer =
|
||||
stream.writeBuffer.chainLength() + stream.writeBufMeta.length;
|
||||
if (dataInBuffer > sendFlowControlBytes) {
|
||||
return 0;
|
||||
} else {
|
||||
|
@@ -318,6 +318,15 @@ TEST_F(QuicFlowControlTest, MaybeWriteBlockedAfterSocketWrite) {
|
||||
EXPECT_CALL(*transportInfoCb_, onStreamFlowControlBlocked()).Times(1);
|
||||
maybeWriteBlockAfterSocketWrite(stream);
|
||||
EXPECT_TRUE(conn_.streamManager->hasBlocked());
|
||||
|
||||
// No block if everything till FIN has been sent
|
||||
conn_.streamManager->removeBlocked(id);
|
||||
EXPECT_FALSE(conn_.streamManager->hasBlocked());
|
||||
stream.finalWriteOffset =
|
||||
stream.currentWriteOffset + stream.writeBuffer.chainLength();
|
||||
stream.currentWriteOffset = *stream.finalWriteOffset + 1;
|
||||
maybeWriteBlockAfterSocketWrite(stream);
|
||||
EXPECT_FALSE(conn_.streamManager->hasBlocked());
|
||||
}
|
||||
|
||||
TEST_F(QuicFlowControlTest, MaybeSendStreamWindowUpdateChangeWindowLarger) {
|
||||
@@ -826,5 +835,26 @@ TEST_F(QuicFlowControlTest, OnStreamWindowUpdateSentWithoutPendingEvent) {
|
||||
EXPECT_FALSE(conn_.streamManager->pendingWindowUpdate(id));
|
||||
}
|
||||
|
||||
TEST_F(QuicFlowControlTest, StreamFlowControlWithBufMeta) {
|
||||
StreamId id = 0;
|
||||
QuicStreamState stream(id, conn_);
|
||||
stream.flowControlState.peerAdvertisedMaxOffset = 1000;
|
||||
stream.currentWriteOffset = 200;
|
||||
stream.writeBuffer.append(buildRandomInputData(100));
|
||||
EXPECT_EQ(800, getSendStreamFlowControlBytesWire(stream));
|
||||
EXPECT_EQ(700, getSendStreamFlowControlBytesAPI(stream));
|
||||
|
||||
stream.writeBufMeta.offset =
|
||||
stream.currentWriteOffset + stream.writeBuffer.chainLength();
|
||||
stream.writeBufMeta.length = 300;
|
||||
EXPECT_EQ(800, getSendStreamFlowControlBytesWire(stream));
|
||||
EXPECT_EQ(400, getSendStreamFlowControlBytesAPI(stream));
|
||||
|
||||
stream.currentWriteOffset += stream.writeBuffer.chainLength();
|
||||
stream.writeBuffer.move();
|
||||
EXPECT_EQ(700, getSendStreamFlowControlBytesWire(stream));
|
||||
EXPECT_EQ(400, getSendStreamFlowControlBytesAPI(stream));
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
} // namespace quic
|
||||
|
@@ -134,8 +134,11 @@ struct QuicStreamLike {
|
||||
|
||||
// Current offset of the start bytes in the write buffer.
|
||||
// This changes when we pop stuff off the writeBuffer.
|
||||
// When we are finished writing out all the bytes until FIN, this will
|
||||
// be one greater than finalWriteOffset.
|
||||
// In a non-DSR stream, when we are finished writing out all the bytes until
|
||||
// FIN, this will be one greater than finalWriteOffset.
|
||||
// When DSR is used, this still points to the starting bytes in the write
|
||||
// buffer. Its value won't change with WriteBufferMetas are appended and sent
|
||||
// for a stream.
|
||||
uint64_t currentWriteOffset{0};
|
||||
|
||||
// the minimum offset requires retransmit
|
||||
@@ -329,6 +332,28 @@ struct QuicStreamState : public QuicStreamLike {
|
||||
return false;
|
||||
}
|
||||
|
||||
FOLLY_NODISCARD bool hasSentFIN() const {
|
||||
if (!finalWriteOffset) {
|
||||
return false;
|
||||
}
|
||||
return currentWriteOffset > *finalWriteOffset ||
|
||||
writeBufMeta.offset > *finalWriteOffset;
|
||||
}
|
||||
|
||||
FOLLY_NODISCARD uint64_t nextOffsetToWrite() const {
|
||||
// The stream has never had WriteBufferMetas. Then currentWriteOffset
|
||||
// always points to the next offset we send. This of course relies on the
|
||||
// current contract of DSR: Real data always comes first. This code (and a
|
||||
// lot other code) breaks when that contract is breached.
|
||||
if (writeBufMeta.offset == 0) {
|
||||
return currentWriteOffset;
|
||||
}
|
||||
if (!writeBuffer.empty()) {
|
||||
return currentWriteOffset;
|
||||
}
|
||||
return writeBufMeta.offset;
|
||||
}
|
||||
|
||||
bool hasReadableData() const {
|
||||
return (readBuffer.size() > 0 &&
|
||||
currentReadOffset == readBuffer.front().offset) ||
|
||||
|
Reference in New Issue
Block a user