1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Track stream bytes written

Summary:
Track the number of stream bytes written to the wire and expose in `quic::TransportInfo`.

Implemented as two counters:
- `totalStreamBytesSent` is a count of all stream bytes written to the wire; the sum of the lengths of stream frames in all packets sent
- `totalNewStreamBytesSent` is a count of all *new* stream bytes written to the wire -- a stream byte is new if it has not been transmitted before; the sum of the lengths of stream frames that have never been transmitted before in all packets sent

We can derive the total number of stream bytes retransmitted as `totalStreamBytesSent - totalNewStreamBytesSent`. We may want to deprecate the existing `totalBytesRetransmitted` counter because the name of that counter does not make it clear that it is stream bytes, and because only includes some types of retransmissions.

While `totalNewStreamBytesSent` is already captured as `flowControlState.sumCurWriteOffset`, I am not reusing that counter here to avoid having a dependency on the information we track for flow control, and to allow all relevant information to be captured in `LossState` (where other relevant fields already exist).

Reviewed By: yangchi

Differential Revision: D28061085

fbshipit-source-id: 73486a4ba3fc8f12959f68702dc58e61fdc21b65
This commit is contained in:
Brandon Schlinker
2021-05-04 23:23:18 -07:00
committed by Facebook GitHub Bot
parent 564acc5278
commit 5590b9f573
5 changed files with 381 additions and 18 deletions

View File

@@ -231,8 +231,8 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) {
writeDataToQuicStream(*stream1, buf->clone(), true);
writeDataToQuicStream(*stream2, buf->clone(), true);
WriteStreamFrame writeStreamFrame1(stream1->id, 0, 5, false),
writeStreamFrame2(stream2->id, 0, 12, true);
WriteStreamFrame writeStreamFrame1(stream1->id, 0, 5, false);
WriteStreamFrame writeStreamFrame2(stream2->id, 0, 12, true);
packet.packet.frames.push_back(std::move(writeStreamFrame1));
packet.packet.frames.push_back(std::move(writeStreamFrame2));
@@ -266,25 +266,23 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) {
currentNextAppDataPacketNum);
EXPECT_TRUE(conn->outstandings.packets.back().isAppLimited);
EXPECT_EQ(stream1->retransmissionBuffer.size(), 1);
auto& rt1 = *stream1->retransmissionBuffer.at(0);
EXPECT_EQ(stream1->currentWriteOffset, 5);
EXPECT_EQ(stream2->currentWriteOffset, 13);
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 17);
IOBufEqualTo eq;
EXPECT_EQ(stream1->retransmissionBuffer.size(), 1);
auto& rt1 = *stream1->retransmissionBuffer.at(0);
EXPECT_EQ(rt1.offset, 0);
EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt1.data.front()));
EXPECT_EQ(stream2->retransmissionBuffer.size(), 1);
auto& rt2 = *stream2->retransmissionBuffer.at(0);
EXPECT_EQ(rt2.offset, 0);
EXPECT_TRUE(eq(*buf, *rt2.data.front()));
EXPECT_TRUE(rt2.eof);
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 17);
// Testing retransmission
stream1->lossBuffer.push_back(std::move(rt1));
stream1->retransmissionBuffer.clear();
@@ -293,7 +291,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) {
conn->streamManager->addLoss(stream1->id);
conn->streamManager->addLoss(stream2->id);
// Write the remainder of the data
// Write the remainder of the data with retransmission
auto packet2 = buildEmptyPacket(*conn, PacketNumberSpace::Handshake);
WriteStreamFrame writeStreamFrame3(stream1->id, 5, 7, true);
WriteStreamFrame writeStreamFrame4(stream1->id, 0, 5, false);
@@ -315,8 +313,8 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) {
folly::none,
packet2.packet,
TimePoint(),
getEncodedSize(packet),
getEncodedBodySize(packet),
getEncodedSize(packet2),
getEncodedBodySize(packet2),
false /* isDSRPacket */);
EXPECT_EQ(
conn->ackStates.initialAckState.nextPacketNum,
@@ -330,7 +328,8 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) {
EXPECT_FALSE(conn->outstandings.packets.back().isAppLimited);
EXPECT_EQ(stream1->currentWriteOffset, 13);
EXPECT_EQ(stream1->currentWriteOffset, 13);
EXPECT_EQ(stream2->currentWriteOffset, 13);
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 24);
EXPECT_EQ(stream1->lossBuffer.size(), 0);
EXPECT_EQ(stream1->retransmissionBuffer.size(), 2);
@@ -386,6 +385,346 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) {
}
}
TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionPacketRetrans) {
const IOBufEqualTo eq;
auto conn = createConn();
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn->congestionController = std::move(mockCongestionController);
conn->qLogger = std::make_shared<quic::FileQLogger>(VantagePoint::Client);
// two streams, both writing "hey whats up"
auto stream1Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream2Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream1 = conn->streamManager->findStream(stream1Id);
auto stream2 = conn->streamManager->findStream(stream2Id);
auto buf = IOBuf::copyBuffer("hey whats up");
writeDataToQuicStream(*stream1, buf->clone(), true /* eof */);
writeDataToQuicStream(*stream2, buf->clone(), true /* eof */);
WriteStreamFrame writeStreamFrame1(stream1->id, 0, 12, true /* eom */);
WriteStreamFrame writeStreamFrame2(stream2->id, 0, 12, true /* eom */);
EXPECT_EQ(stream1->currentWriteOffset, 0);
EXPECT_EQ(stream2->currentWriteOffset, 0);
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 0);
// add both stream frames into AppData packet1
auto packet1 = buildEmptyPacket(*conn, PacketNumberSpace::AppData);
packet1.packet.frames.push_back(writeStreamFrame1);
packet1.packet.frames.push_back(writeStreamFrame2);
// mimic send, call updateConnection
auto currentNextInitialPacketNum =
conn->ackStates.initialAckState.nextPacketNum;
auto currentNextHandshakePacketNum =
conn->ackStates.handshakeAckState.nextPacketNum;
auto currentNextAppDataPacketNum =
conn->ackStates.appDataAckState.nextPacketNum;
EXPECT_CALL(*rawCongestionController, onPacketSent(_)).Times(1);
EXPECT_CALL(*rawCongestionController, isAppLimited())
.Times(1)
.WillOnce(Return(true));
updateConnection(
*conn,
folly::none,
packet1.packet,
TimePoint{},
getEncodedSize(packet1),
getEncodedBodySize(packet1),
false /* isDSRPacket */);
// appData packet number should increase
EXPECT_EQ(
conn->ackStates.initialAckState.nextPacketNum,
currentNextInitialPacketNum); // no change
EXPECT_EQ(
conn->ackStates.handshakeAckState.nextPacketNum,
currentNextHandshakePacketNum); // no change
EXPECT_GT(
conn->ackStates.appDataAckState.nextPacketNum,
currentNextAppDataPacketNum); // increased
EXPECT_TRUE(conn->outstandings.packets.back().isAppLimited);
// offsets should be 13 (len + EOF) and 24 (bytes without EOF)
EXPECT_EQ(stream1->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(stream2->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 24); // sum(len)
// verify retransmission buffer and mark stream bytes in packet1 lost
{
ASSERT_EQ(stream1->retransmissionBuffer.size(), 1);
auto& rt = *stream1->retransmissionBuffer.at(0);
EXPECT_EQ(rt.offset, 0);
EXPECT_TRUE(eq(*buf, *rt.data.front()));
EXPECT_TRUE(rt.eof);
stream1->lossBuffer.push_back(std::move(rt));
}
{
ASSERT_EQ(stream2->retransmissionBuffer.size(), 1);
auto& rt = *stream2->retransmissionBuffer.at(0);
EXPECT_EQ(rt.offset, 0);
EXPECT_TRUE(eq(*buf, *rt.data.front()));
EXPECT_TRUE(rt.eof);
stream2->lossBuffer.push_back(std::move(rt));
}
stream1->retransmissionBuffer.clear();
stream2->retransmissionBuffer.clear();
conn->streamManager->addLoss(stream1->id);
conn->streamManager->addLoss(stream2->id);
// retransmit the lost frames in AppData packet2
auto packet2 = buildEmptyPacket(*conn, PacketNumberSpace::AppData);
packet2.packet.frames.push_back(writeStreamFrame1);
packet2.packet.frames.push_back(writeStreamFrame2);
// mimic send, call updateConnection
EXPECT_CALL(*rawCongestionController, onPacketSent(_)).Times(1);
currentNextInitialPacketNum = conn->ackStates.initialAckState.nextPacketNum;
currentNextHandshakePacketNum =
conn->ackStates.handshakeAckState.nextPacketNum;
currentNextAppDataPacketNum = conn->ackStates.appDataAckState.nextPacketNum;
EXPECT_CALL(*rawCongestionController, isAppLimited())
.Times(1)
.WillOnce(Return(false));
updateConnection(
*conn,
folly::none,
packet2.packet,
TimePoint(),
getEncodedSize(packet2),
getEncodedBodySize(packet2),
false /* isDSRPacket */);
EXPECT_EQ(
conn->ackStates.initialAckState.nextPacketNum,
currentNextInitialPacketNum); // no change
EXPECT_EQ(
conn->ackStates.handshakeAckState.nextPacketNum,
currentNextHandshakePacketNum); // no change
EXPECT_GT(
conn->ackStates.appDataAckState.nextPacketNum,
currentNextAppDataPacketNum); // increased
EXPECT_FALSE(conn->outstandings.packets.back().isAppLimited);
// since retransmission with no new data, no change in offsets
EXPECT_EQ(stream1->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(stream2->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 24); // sum(len)
// check loss state
CHECK_EQ(
conn->lossState.totalBytesSent,
getEncodedSize(packet1) + getEncodedSize(packet2));
CHECK_EQ(
conn->lossState.totalBodyBytesSent,
getEncodedBodySize(packet1) + getEncodedBodySize(packet2));
CHECK_EQ(conn->lossState.totalPacketsSent, 2);
// totalStreamBytesSent:
// the first packet contained 12 + 12
// the second packet contained 12 + 12
// total = 48
EXPECT_EQ(conn->lossState.totalStreamBytesSent, 48); // sum(len)
// totalNewStreamBytesSent: just sum(len)
EXPECT_EQ(conn->lossState.totalNewStreamBytesSent, 24);
EXPECT_EQ(
conn->lossState.totalNewStreamBytesSent,
conn->flowControlState.sumCurWriteOffset);
}
TEST_F(
QuicTransportFunctionsTest,
TestUpdateConnectionPacketRetransWithNewData) {
const IOBufEqualTo eq;
auto conn = createConn();
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn->congestionController = std::move(mockCongestionController);
conn->qLogger = std::make_shared<quic::FileQLogger>(VantagePoint::Client);
// three streams, all writing "hey whats up"
//
// streams1 and 2 EOF after writing buffer, stream3 does not
//
// frames:
// stream1,frame1 contains the entire string with EOM
// stream2,frame1 contains "hey w", and thus no EOM
// stream3,frame1 contains the entire string, but no EOM given no EOF yet
//
// we'll write additional data to stream3 later
auto stream1Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream2Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream3Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream1 = conn->streamManager->findStream(stream1Id);
auto stream2 = conn->streamManager->findStream(stream2Id);
auto stream3 = conn->streamManager->findStream(stream3Id);
auto buf = IOBuf::copyBuffer("hey whats up");
writeDataToQuicStream(*stream1, buf->clone(), true /* eof */);
writeDataToQuicStream(*stream2, buf->clone(), true /* eof */);
writeDataToQuicStream(*stream3, buf->clone(), false /* eof */);
WriteStreamFrame writeStreamFrame1(stream1->id, 0, 12, true /* eom */);
WriteStreamFrame writeStreamFrame2(stream2->id, 0, 5, false /* eom */);
WriteStreamFrame writeStreamFrame3(stream3->id, 0, 12, false /* eom */);
EXPECT_EQ(stream1->currentWriteOffset, 0);
EXPECT_EQ(stream2->currentWriteOffset, 0);
EXPECT_EQ(stream3->currentWriteOffset, 0);
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 0);
// add all stream frames into AppData packet1
auto packet1 = buildEmptyPacket(*conn, PacketNumberSpace::AppData);
packet1.packet.frames.push_back(writeStreamFrame1);
packet1.packet.frames.push_back(writeStreamFrame2);
packet1.packet.frames.push_back(writeStreamFrame3);
// mimic send, call updateConnection
auto currentNextInitialPacketNum =
conn->ackStates.initialAckState.nextPacketNum;
auto currentNextHandshakePacketNum =
conn->ackStates.handshakeAckState.nextPacketNum;
auto currentNextAppDataPacketNum =
conn->ackStates.appDataAckState.nextPacketNum;
EXPECT_CALL(*rawCongestionController, onPacketSent(_)).Times(1);
EXPECT_CALL(*rawCongestionController, isAppLimited())
.Times(1)
.WillOnce(Return(true));
updateConnection(
*conn,
folly::none,
packet1.packet,
TimePoint{},
getEncodedSize(packet1),
getEncodedBodySize(packet1),
false /* isDSRPacket */);
// appData packet number should increase
EXPECT_EQ(
conn->ackStates.initialAckState.nextPacketNum,
currentNextInitialPacketNum); // no change
EXPECT_EQ(
conn->ackStates.handshakeAckState.nextPacketNum,
currentNextHandshakePacketNum); // no change
EXPECT_GT(
conn->ackStates.appDataAckState.nextPacketNum,
currentNextAppDataPacketNum); // increased
EXPECT_TRUE(conn->outstandings.packets.back().isAppLimited);
// check offsets
EXPECT_EQ(stream1->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(stream2->currentWriteOffset, 5); // len (5)
EXPECT_EQ(stream3->currentWriteOffset, 12); // len (12)
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 29); // sum(len)
// verify retransmission buffer and mark stream bytes in packet1 lost
{
ASSERT_EQ(stream1->retransmissionBuffer.size(), 1);
auto& rt = *stream1->retransmissionBuffer.at(0);
EXPECT_EQ(rt.offset, 0);
EXPECT_TRUE(eq(*buf, *rt.data.front()));
EXPECT_TRUE(rt.eof);
stream1->lossBuffer.push_back(std::move(rt));
}
{
ASSERT_EQ(stream2->retransmissionBuffer.size(), 1);
auto& rt = *stream2->retransmissionBuffer.at(0);
EXPECT_EQ(rt.offset, 0);
EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt.data.front()));
EXPECT_FALSE(rt.eof);
stream2->lossBuffer.push_back(std::move(rt));
}
{
ASSERT_EQ(stream3->retransmissionBuffer.size(), 1);
auto& rt = *stream3->retransmissionBuffer.at(0);
EXPECT_EQ(rt.offset, 0);
EXPECT_TRUE(eq(*buf, *rt.data.front()));
EXPECT_FALSE(rt.eof);
stream3->lossBuffer.push_back(std::move(rt));
}
stream1->retransmissionBuffer.clear();
stream2->retransmissionBuffer.clear();
stream3->retransmissionBuffer.clear();
conn->streamManager->addLoss(stream1->id);
conn->streamManager->addLoss(stream2->id);
conn->streamManager->addLoss(stream3->id);
// add some additional data
// write a "?" to stream3 and set eof
auto buf2 = IOBuf::copyBuffer("?");
writeDataToQuicStream(*stream3, buf->clone(), true /* eof */);
// packet2 contains orignally transmitted frames + new data frames
auto packet2 = buildEmptyPacket(*conn, PacketNumberSpace::AppData);
packet2.packet.frames.push_back(writeStreamFrame1);
packet2.packet.frames.push_back(writeStreamFrame2);
packet2.packet.frames.push_back(writeStreamFrame3);
packet2.packet.frames.push_back(WriteStreamFrame(
stream2->id, 5 /* offset */, 7 /* len */, true /* eom */));
packet2.packet.frames.push_back(WriteStreamFrame(
stream3->id, 12 /* offset */, 1 /* len */, true /* eom */));
// mimic send, call updateConnection
EXPECT_CALL(*rawCongestionController, onPacketSent(_)).Times(1);
currentNextInitialPacketNum = conn->ackStates.initialAckState.nextPacketNum;
currentNextHandshakePacketNum =
conn->ackStates.handshakeAckState.nextPacketNum;
currentNextAppDataPacketNum = conn->ackStates.appDataAckState.nextPacketNum;
EXPECT_CALL(*rawCongestionController, isAppLimited())
.Times(1)
.WillOnce(Return(false));
updateConnection(
*conn,
folly::none,
packet2.packet,
TimePoint(),
getEncodedSize(packet2),
getEncodedBodySize(packet2),
false /* isDSRPacket */);
EXPECT_EQ(
conn->ackStates.initialAckState.nextPacketNum,
currentNextInitialPacketNum); // no change
EXPECT_EQ(
conn->ackStates.handshakeAckState.nextPacketNum,
currentNextHandshakePacketNum); // no change
EXPECT_GT(
conn->ackStates.appDataAckState.nextPacketNum,
currentNextAppDataPacketNum); // increased
EXPECT_FALSE(conn->outstandings.packets.back().isAppLimited);
// check offsets
EXPECT_EQ(stream1->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(stream2->currentWriteOffset, 13); // len (12) + EOF (1)
EXPECT_EQ(stream3->currentWriteOffset, 14); // len (13) + EOF (1)
EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 37); // sum(len)
// check loss state
CHECK_EQ(
conn->lossState.totalBytesSent,
getEncodedSize(packet1) + getEncodedSize(packet2));
CHECK_EQ(
conn->lossState.totalBodyBytesSent,
getEncodedBodySize(packet1) + getEncodedBodySize(packet2));
CHECK_EQ(conn->lossState.totalPacketsSent, 2);
// totalStreamBytesSent:
// the first packet contained 12 + 5 + 12 stream bytes
// the second packet contained 12 + 12 + 13 stream bytes
// total = 66
EXPECT_EQ(conn->lossState.totalStreamBytesSent, 66); // sum(len)
// totalNewStreamBytesSent: just sum(len)
EXPECT_EQ(conn->lossState.totalNewStreamBytesSent, 37);
EXPECT_EQ(
conn->lossState.totalNewStreamBytesSent,
conn->flowControlState.sumCurWriteOffset);
}
TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionD6DNotConsumeSendPing) {
auto conn = createConn();
conn->pendingEvents.sendPing = true; // Simulate application sendPing()