1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

Count cumulative # of egress packets for a stream

Summary:
There are situations where application needs to know how many packets were transmitted for a stream on certain byte range. This diff provides *some* level of that information, by adding the `cumulativeTxedPackets` field in `StreamLike`, which stores the number of egress packets that contain new STREAM frame(s) for a stream.

~~To prevent double-counting, I added a fast set of stream ids.~~

Application could rely on other callbacks (e.g. ByteEventCallback or DeliveryCallback) to get notified, and use `getStreamTransportInfo` to get `packetsTxed` for a stream.

Reviewed By: bschlinker, mjoras

Differential Revision: D23361789

fbshipit-source-id: 6624ddcbe9cf62c628f936eda2a39d0fc2781636
This commit is contained in:
Xiaoting Tang
2020-09-01 15:49:10 -07:00
committed by Facebook GitHub Bot
parent 820f102401
commit 3fac7d21f4
7 changed files with 165 additions and 2 deletions

View File

@@ -150,6 +150,9 @@ class QuicSocket {
// Is the stream head-of-line blocked? // Is the stream head-of-line blocked?
bool isHolb{false}; bool isHolb{false};
// Number of packets transmitted that carry new STREAM frame for this stream
uint64_t numPacketsTxWithNewData{0};
}; };
/** /**

View File

@@ -2867,8 +2867,11 @@ QuicTransportBase::getStreamTransportInfo(StreamId id) const {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
} }
auto stream = conn_->streamManager->getStream(id); auto stream = conn_->streamManager->getStream(id);
return StreamTransportInfo{ auto packets = getNumPacketsTxWithNewData(*stream);
stream->totalHolbTime, stream->holbCount, bool(stream->lastHolbTime)}; return StreamTransportInfo{stream->totalHolbTime,
stream->holbCount,
bool(stream->lastHolbTime),
packets};
} }
void QuicTransportBase::describe(std::ostream& os) const { void QuicTransportBase::describe(std::ostream& os) const {

View File

@@ -433,6 +433,10 @@ bool handleStreamWritten(
PacketNumberSpace packetNumberSpace) { PacketNumberSpace packetNumberSpace) {
// Handle new data first // Handle new data first
if (frameOffset == stream.currentWriteOffset) { if (frameOffset == stream.currentWriteOffset) {
// Count packet. It's based on the assumption that schedluing scheme will
// only writes one STREAM frame for a stream in a packet. If that doesn't
// hold, we need to avoid double-counting.
stream.numPacketsTxWithNewData++;
handleNewStreamDataWritten( handleNewStreamDataWritten(
conn, stream, frameLen, frameFin, packetNum, packetNumberSpace); conn, stream, frameLen, frameFin, packetNum, packetNumberSpace);
return true; return true;

View File

@@ -3141,5 +3141,144 @@ TEST_F(QuicTransportTest, SaneCwndSettings) {
conn.congestionController->getCongestionWindow()); conn.congestionController->getCongestionWindow());
} }
TEST_F(QuicTransportTest, GetStreamPackestTxedSingleByte) {
StrictMock<MockByteEventCallback> firstByteTxCb;
auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(1);
transport_->writeChain(
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb);
// when first byte TX callback gets invoked, numPacketsTxWithNewData should be
// one
EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* event */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 1);
}));
loopForWrites();
Mock::VerifyAndClearExpectations(&firstByteTxCb);
}
TEST_F(QuicTransportTest, GetStreamPacketsTxedMultipleBytes) {
const uint64_t streamBytes = 10;
const uint64_t lastByte = streamBytes - 1;
StrictMock<MockByteEventCallback> firstByteTxCb;
StrictMock<MockByteEventCallback> lastByteTxCb;
auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(streamBytes);
CHECK_EQ(streamBytes, buf->length());
transport_->writeChain(
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback(stream, lastByte, &lastByteTxCb);
// when first and last byte TX callbacsk fired, numPacketsTxWithNewData should
// be 1
EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* event */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 1);
}));
EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, lastByte)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* event */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 1);
}));
loopForWrites();
Mock::VerifyAndClearExpectations(&firstByteTxCb);
Mock::VerifyAndClearExpectations(&lastByteTxCb);
}
TEST_F(QuicTransportTest, GetStreamPacketsTxedMultiplePackets) {
auto& conn = transport_->getConnectionState();
conn.transportSettings.writeConnectionDataPacketsLimit = 1;
const uint64_t streamBytes = kDefaultUDPSendPacketLen * 4;
const uint64_t lastByte = streamBytes - 1;
// 20 bytes overhead per packet should be more than enough
const uint64_t firstPacketNearTailByte = kDefaultUDPSendPacketLen - 20;
const uint64_t secondPacketNearHeadByte = kDefaultUDPSendPacketLen;
const uint64_t secondPacketNearTailByte = kDefaultUDPSendPacketLen * 2 - 40;
StrictMock<MockByteEventCallback> firstByteTxCb;
StrictMock<MockByteEventCallback> firstPacketNearTailByteTxCb;
StrictMock<MockByteEventCallback> secondPacketNearHeadByteTxCb;
StrictMock<MockByteEventCallback> secondPacketNearTailByteTxCb;
StrictMock<MockByteEventCallback> lastByteTxCb;
auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(streamBytes);
CHECK_EQ(streamBytes, buf->length());
transport_->writeChain(
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback(
stream, firstPacketNearTailByte, &firstPacketNearTailByteTxCb);
transport_->registerTxCallback(
stream, secondPacketNearHeadByte, &secondPacketNearHeadByteTxCb);
transport_->registerTxCallback(
stream, secondPacketNearTailByte, &secondPacketNearTailByteTxCb);
transport_->registerTxCallback(stream, lastByte, &lastByteTxCb);
// first byte and first packet last bytes get Txed on first loopForWrites
EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* event */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 1);
}));
EXPECT_CALL(
firstPacketNearTailByteTxCb,
onByteEvent(getTxMatcher(stream, firstPacketNearTailByte)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* even */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 1);
}));
loopForWrites();
Mock::VerifyAndClearExpectations(&firstByteTxCb);
Mock::VerifyAndClearExpectations(&firstPacketNearTailByteTxCb);
// second packet should be send on the second loopForWrites
EXPECT_CALL(
secondPacketNearHeadByteTxCb,
onByteEvent(getTxMatcher(stream, secondPacketNearHeadByte)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* even */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 2);
}));
EXPECT_CALL(
secondPacketNearTailByteTxCb,
onByteEvent(getTxMatcher(stream, secondPacketNearTailByte)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* even */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 2);
}));
loopForWrites();
Mock::VerifyAndClearExpectations(&secondPacketNearHeadByteTxCb);
Mock::VerifyAndClearExpectations(&secondPacketNearTailByteTxCb);
// last byte will be sent on the fifth loopForWrites
EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, lastByte)))
.Times(1)
.WillOnce(Invoke([&](QuicSocket::ByteEvent /* event */) {
auto info = *transport_->getStreamTransportInfo(stream);
EXPECT_EQ(info.numPacketsTxWithNewData, 5);
}));
loopForWrites();
loopForWrites();
loopForWrites();
Mock::VerifyAndClearExpectations(&lastByteTxCb);
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -403,6 +403,10 @@ folly::Optional<uint64_t> getLargestDeliverableOffset(
return stream.ackedIntervals.front().end; return stream.ackedIntervals.front().end;
} }
uint64_t getNumPacketsTxWithNewData(const QuicStreamState& stream) {
return stream.numPacketsTxWithNewData;
}
// TODO reap // TODO reap
void cancelHandshakeCryptoStreamRetransmissions(QuicCryptoState& cryptoState) { void cancelHandshakeCryptoStreamRetransmissions(QuicCryptoState& cryptoState) {
// Cancel any retransmissions we might want to do for the crypto stream. // Cancel any retransmissions we might want to do for the crypto stream.

View File

@@ -107,6 +107,12 @@ folly::Optional<uint64_t> getLargestWriteOffsetTxed(
folly::Optional<uint64_t> getLargestDeliverableOffset( folly::Optional<uint64_t> getLargestDeliverableOffset(
const QuicStreamState& stream); const QuicStreamState& stream);
/**
* Get the cumulative number of packets that contains STREAM frame for this
* stream. It does not count retransmissions.
*/
uint64_t getNumPacketsTxWithNewData(const QuicStreamState& stream);
/** /**
* Common functions for merging data into the read buffer for a Quic stream like * Common functions for merging data into the read buffer for a Quic stream like
* object. Callers should provide a connFlowControlVisitor which will be invoked * object. Callers should provide a connFlowControlVisitor which will be invoked

View File

@@ -92,6 +92,10 @@ struct QuicStreamLike {
// Read side eof offset. // Read side eof offset.
folly::Optional<uint64_t> finalReadOffset; folly::Optional<uint64_t> finalReadOffset;
// Current cumulative number of packets sent for this stream. It only counts
// egress packets that contains a *new* STREAM frame for this stream.
uint64_t numPacketsTxWithNewData{0};
/* /*
* Either insert a new entry into the loss buffer, or merge the buffer with * Either insert a new entry into the loss buffer, or merge the buffer with
* an existing entry. * an existing entry.