1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-01 01:44:22 +03:00

Aggregate QUIC stats callbacks

Summary: This reduces the number of stats callbacks when processing multiple packets in rapid succession.

Reviewed By: mjoras

Differential Revision: D56315022

fbshipit-source-id: 750024301f28b21e3125c144ead6f115706736a4
This commit is contained in:
Alan Frindell
2024-04-21 16:38:06 -07:00
committed by Facebook GitHub Bot
parent 406c24908d
commit ee90db8520
9 changed files with 99 additions and 31 deletions

View File

@ -239,6 +239,7 @@ FrameScheduler::FrameScheduler(
SchedulingResult FrameScheduler::scheduleFramesForPacket( SchedulingResult FrameScheduler::scheduleFramesForPacket(
PacketBuilderInterface&& builder, PacketBuilderInterface&& builder,
uint32_t writableBytes) { uint32_t writableBytes) {
size_t shortHeaderPadding = 0;
builder.encodePacketHeader(); builder.encodePacketHeader();
// We need to keep track of writable bytes after writing header. // We need to keep track of writable bytes after writing header.
writableBytes = writableBytes > builder.getHeaderBytes() writableBytes = writableBytes > builder.getHeaderBytes()
@ -322,12 +323,13 @@ SchedulingResult FrameScheduler::scheduleFramesForPacket(
for (size_t i = 0; i < paddingIncrement; i++) { for (size_t i = 0; i < paddingIncrement; i++) {
writeFrame(PaddingFrame(), builder); writeFrame(PaddingFrame(), builder);
} }
QUIC_STATS(conn_.statsCallback, onShortHeaderPadding, paddingIncrement); shortHeaderPadding = paddingIncrement;
} }
} }
} }
return SchedulingResult(folly::none, std::move(builder).buildPacket()); return SchedulingResult(
folly::none, std::move(builder).buildPacket(), shortHeaderPadding);
} }
void FrameScheduler::writeNextAcks(PacketBuilderInterface& builder) { void FrameScheduler::writeNextAcks(PacketBuilderInterface& builder) {
@ -942,7 +944,9 @@ SchedulingResult CloningScheduler::scheduleFramesForPacket(
auto rebuildResult = rebuilder.rebuildFromPacket(outstandingPacket); auto rebuildResult = rebuilder.rebuildFromPacket(outstandingPacket);
if (rebuildResult) { if (rebuildResult) {
return SchedulingResult( return SchedulingResult(
std::move(rebuildResult), std::move(*internalBuilder).buildPacket()); std::move(rebuildResult),
std::move(*internalBuilder).buildPacket(),
0);
} else if ( } else if (
conn_.transportSettings.dataPathType == conn_.transportSettings.dataPathType ==
DataPathType::ContinuousMemory) { DataPathType::ContinuousMemory) {
@ -961,7 +965,7 @@ SchedulingResult CloningScheduler::scheduleFramesForPacket(
buf->trimEnd(buf->length() - prevSize); buf->trimEnd(buf->length() - prevSize);
} }
} }
return SchedulingResult(folly::none, folly::none); return SchedulingResult(folly::none, folly::none, 0);
} }
folly::StringPiece CloningScheduler::name() const { folly::StringPiece CloningScheduler::name() const {

View File

@ -25,11 +25,15 @@ namespace quic {
struct SchedulingResult { struct SchedulingResult {
folly::Optional<PacketEvent> packetEvent; folly::Optional<PacketEvent> packetEvent;
folly::Optional<PacketBuilderInterface::Packet> packet; folly::Optional<PacketBuilderInterface::Packet> packet;
size_t shortHeaderPadding;
explicit SchedulingResult( explicit SchedulingResult(
folly::Optional<PacketEvent> packetEventIn, folly::Optional<PacketEvent> packetEventIn,
folly::Optional<PacketBuilderInterface::Packet> packetIn) folly::Optional<PacketBuilderInterface::Packet> packetIn,
: packetEvent(std::move(packetEventIn)), packet(std::move(packetIn)) {} size_t shortHeaderPaddingIn = 0)
: packetEvent(std::move(packetEventIn)),
packet(std::move(packetIn)),
shortHeaderPadding(shortHeaderPaddingIn) {}
}; };
/** /**

View File

@ -308,11 +308,6 @@ DataPathResult continuousMemoryBuildScheduleEncrypt(
} }
// TODO: I think we should add an API that doesn't need a buffer. // TODO: I think we should add an API that doesn't need a buffer.
bool ret = ioBufBatch.write(nullptr /* no need to pass buf */, encodedSize); bool ret = ioBufBatch.write(nullptr /* no need to pass buf */, encodedSize);
// update stats and connection
if (ret) {
QUIC_STATS(connection.statsCallback, onWrite, encodedSize);
QUIC_STATS(connection.statsCallback, onPacketSent);
}
return DataPathResult::makeWriteResult( return DataPathResult::makeWriteResult(
ret, std::move(result), encodedSize, encodedBodySize); ret, std::move(result), encodedSize, encodedBodySize);
} }
@ -384,11 +379,6 @@ DataPathResult iobufChainBasedBuildScheduleEncrypt(
<< " encodedBodySize=" << encodedBodySize; << " encodedBodySize=" << encodedBodySize;
} }
bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize); bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize);
if (ret) {
// update stats and connection
QUIC_STATS(connection.statsCallback, onWrite, encodedSize);
QUIC_STATS(connection.statsCallback, onPacketSent);
}
return DataPathResult::makeWriteResult( return DataPathResult::makeWriteResult(
ret, std::move(result), encodedSize, encodedBodySize); ret, std::move(result), encodedSize, encodedBodySize);
} }
@ -1522,6 +1512,22 @@ WriteQuicDataResult writeConnectionDataToSocket(
: connection.transportSettings.maxBatchSize; : connection.transportSettings.maxBatchSize;
uint64_t bytesWritten = 0; uint64_t bytesWritten = 0;
uint64_t shortHeaderPadding = 0;
uint64_t shortHeaderPaddingCount = 0;
SCOPE_EXIT {
auto nSent = ioBufBatch.getPktSent();
if (nSent > 0) {
QUIC_STATS(connection.statsCallback, onPacketsSent, nSent);
QUIC_STATS(connection.statsCallback, onWrite, bytesWritten);
if (shortHeaderPadding > 0) {
QUIC_STATS(
connection.statsCallback,
onShortHeaderPaddingBatch,
shortHeaderPaddingCount,
shortHeaderPadding);
}
}
};
while (scheduler.hasData() && ioBufBatch.getPktSent() < packetLimit && while (scheduler.hasData() && ioBufBatch.getPktSent() < packetLimit &&
((ioBufBatch.getPktSent() < batchSize) || ((ioBufBatch.getPktSent() < batchSize) ||
@ -1565,6 +1571,10 @@ WriteQuicDataResult writeConnectionDataToSocket(
// pretend write was also successful but packet is lost somewhere in the // pretend write was also successful but packet is lost somewhere in the
// network. // network.
bytesWritten += ret.encodedSize; bytesWritten += ret.encodedSize;
if (ret.result && ret.result->shortHeaderPadding > 0) {
shortHeaderPaddingCount++;
shortHeaderPadding += ret.result->shortHeaderPadding;
}
auto& result = ret.result; auto& result = ret.result;
updateConnection( updateConnection(

View File

@ -2390,12 +2390,12 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingWithSpaceForPadding) {
NiceMock<MockQuicStats> quicStats; NiceMock<MockQuicStats> quicStats;
conn.statsCallback = &quicStats; conn.statsCallback = &quicStats;
EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1);
auto result1 = scheduler.scheduleFramesForPacket( auto result1 = scheduler.scheduleFramesForPacket(
std::move(builder1), conn.udpSendPacketLen); std::move(builder1), conn.udpSendPacketLen);
EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1); EXPECT_GT(result1.shortHeaderPadding, 0);
auto result2 = scheduler.scheduleFramesForPacket( auto result2 = scheduler.scheduleFramesForPacket(
std::move(builder2), conn.udpSendPacketLen); std::move(builder2), conn.udpSendPacketLen);
EXPECT_GT(result2.shortHeaderPadding, 0);
auto headerLength1 = result1.packet->header.computeChainDataLength(); auto headerLength1 = result1.packet->header.computeChainDataLength();
auto bodyLength1 = result1.packet->body.computeChainDataLength(); auto bodyLength1 = result1.packet->body.computeChainDataLength();
@ -2451,9 +2451,9 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingNearMaxPacketLength) {
NiceMock<MockQuicStats> quicStats; NiceMock<MockQuicStats> quicStats;
conn.statsCallback = &quicStats; conn.statsCallback = &quicStats;
EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1);
auto result = scheduler.scheduleFramesForPacket( auto result = scheduler.scheduleFramesForPacket(
std::move(builder), conn.udpSendPacketLen); std::move(builder), conn.udpSendPacketLen);
EXPECT_GT(result.shortHeaderPadding, 0);
auto headerLength = result.packet->header.computeChainDataLength(); auto headerLength = result.packet->header.computeChainDataLength();
auto bodyLength = result.packet->body.computeChainDataLength(); auto bodyLength = result.packet->body.computeChainDataLength();
@ -2506,9 +2506,9 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingMaxPacketLength) {
NiceMock<MockQuicStats> quicStats; NiceMock<MockQuicStats> quicStats;
conn.statsCallback = &quicStats; conn.statsCallback = &quicStats;
EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1);
auto result = scheduler.scheduleFramesForPacket( auto result = scheduler.scheduleFramesForPacket(
std::move(builder), conn.udpSendPacketLen); std::move(builder), conn.udpSendPacketLen);
EXPECT_EQ(result.shortHeaderPadding, 0);
auto headerLength = result.packet->header.computeChainDataLength(); auto headerLength = result.packet->header.computeChainDataLength();
auto bodyLength = result.packet->body.computeChainDataLength(); auto bodyLength = result.packet->body.computeChainDataLength();

View File

@ -2715,8 +2715,7 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketLimitTest) {
})); }));
EXPECT_CALL(*rawCongestionController, onPacketSent(_)) EXPECT_CALL(*rawCongestionController, onPacketSent(_))
.Times(kDefaultWriteConnectionDataPacketLimit * 2); .Times(kDefaultWriteConnectionDataPacketLimit * 2);
EXPECT_CALL(*quicStats_, onWrite(_)) EXPECT_CALL(*quicStats_, onWrite(_)).Times(1);
.Times(kDefaultWriteConnectionDataPacketLimit * 2);
res = writeQuicDataToSocket( res = writeQuicDataToSocket(
*rawSocket, *rawSocket,
*conn, *conn,
@ -2926,7 +2925,7 @@ TEST_F(QuicTransportFunctionsTest, WriteBlockedFrameWhenBlocked) {
// Artificially Block the stream // Artificially Block the stream
stream1->flowControlState.peerAdvertisedMaxOffset = 10; stream1->flowControlState.peerAdvertisedMaxOffset = 10;
// writes blocked frame in additionally // writes blocked frame in additionally
EXPECT_CALL(*quicStats_, onWrite(_)).Times(2); EXPECT_CALL(*quicStats_, onWrite(_)).Times(1);
writeQuicDataToSocket( writeQuicDataToSocket(
*rawSocket, *rawSocket,
*conn, *conn,

View File

@ -1167,6 +1167,8 @@ void QuicClientTransport::recvMsg(
NetworkData& networkData, NetworkData& networkData,
folly::Optional<folly::SocketAddress>& server, folly::Optional<folly::SocketAddress>& server,
size_t& totalData) { size_t& totalData) {
uint32_t totalPacketLen = 0;
uint32_t totalPackets = 0;
for (int packetNum = 0; packetNum < numPackets; ++packetNum) { for (int packetNum = 0; packetNum < numPackets; ++packetNum) {
// We create 1 buffer per packet so that it is not shared, this enables // We create 1 buffer per packet so that it is not shared, this enables
// us to decrypt in place. If the fizz decrypt api could decrypt in-place // us to decrypt in place. If the fizz decrypt api could decrypt in-place
@ -1281,8 +1283,11 @@ void QuicClientTransport::recvMsg(
} else { } else {
networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer))); networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
} }
trackDatagramReceived(bytesRead); maybeQlogDatagram(bytesRead);
totalPackets++;
totalPacketLen += bytesRead;
} }
trackDatagramsReceived(totalPackets, totalPacketLen);
} }
void QuicClientTransport::recvMmsg( void QuicClientTransport::recvMmsg(
@ -1357,6 +1362,8 @@ void QuicClientTransport::recvMmsg(
} }
CHECK_LE(numMsgsRecvd, numPackets); CHECK_LE(numMsgsRecvd, numPackets);
uint32_t totalPacketLen = 0;
uint32_t totalPackets = 0;
for (uint16_t i = 0; i < static_cast<uint16_t>(numMsgsRecvd); ++i) { for (uint16_t i = 0; i < static_cast<uint16_t>(numMsgsRecvd); ++i) {
auto& addr = recvmmsgStorage_.impl_[i].addr; auto& addr = recvmmsgStorage_.impl_[i].addr;
auto& readBuffer = recvmmsgStorage_.impl_[i].readBuffer; auto& readBuffer = recvmmsgStorage_.impl_[i].readBuffer;
@ -1425,8 +1432,11 @@ void QuicClientTransport::recvMmsg(
networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer))); networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
} }
trackDatagramReceived(bytesRead); maybeQlogDatagram(bytesRead);
totalPackets++;
totalPacketLen += bytesRead;
} }
trackDatagramsReceived(totalPackets, totalPacketLen);
} }
void QuicClientTransport::onNotifyDataAvailable( void QuicClientTransport::onNotifyDataAvailable(
@ -1441,6 +1451,8 @@ void QuicClientTransport::onNotifyDataAvailable(
networkData.reserve(numPackets); networkData.reserve(numPackets);
size_t totalData = 0; size_t totalData = 0;
folly::Optional<folly::SocketAddress> server; folly::Optional<folly::SocketAddress> server;
uint32_t totalPacketLen = 0;
uint32_t totalPackets = 0;
if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) { if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) {
const auto result = sock.recvmmsgNetworkData( const auto result = sock.recvmmsgNetworkData(
@ -1451,8 +1463,12 @@ void QuicClientTransport::onNotifyDataAvailable(
if (!packet.buf) { if (!packet.buf) {
continue; continue;
} }
trackDatagramReceived(packet.buf->computeChainDataLength()); auto len = packet.buf->computeChainDataLength();
maybeQlogDatagram(len);
totalPackets++;
totalPacketLen += len;
} }
trackDatagramsReceived(totalPackets, totalPacketLen);
// Propagate errors // Propagate errors
// TODO(bschlinker): Investigate generalization of loopDetectorCallback // TODO(bschlinker): Investigate generalization of loopDetectorCallback
@ -1713,12 +1729,17 @@ void QuicClientTransport::setTransportStatsCallback(
} }
} }
void QuicClientTransport::trackDatagramReceived(size_t len) { void QuicClientTransport::maybeQlogDatagram(size_t len) {
if (conn_->qLogger) { if (conn_->qLogger) {
conn_->qLogger->addDatagramReceived(len); conn_->qLogger->addDatagramReceived(len);
} }
QUIC_STATS(statsCallback_, onPacketReceived); }
QUIC_STATS(statsCallback_, onRead, len);
void QuicClientTransport::trackDatagramsReceived(
uint32_t totalPackets,
uint32_t totalPacketLen) {
QUIC_STATS(statsCallback_, onPacketsReceived, totalPackets);
QUIC_STATS(statsCallback_, onRead, totalPacketLen);
} }
void QuicClientTransport::maybeSendTransportKnobs() { void QuicClientTransport::maybeSendTransportKnobs() {

View File

@ -317,7 +317,8 @@ class QuicClientTransport
}; };
void adjustGROBuffers(); void adjustGROBuffers();
void trackDatagramReceived(size_t len); void maybeQlogDatagram(size_t len);
void trackDatagramsReceived(uint32_t totalPackets, uint32_t totalPacketLen);
/** /**
* Send quic transport knobs defined by transportSettings.knobs to peer. This * Send quic transport knobs defined by transportSettings.knobs to peer. This

View File

@ -848,6 +848,7 @@ void onServerReadDataFromOpen(
} }
BufQueue udpData; BufQueue udpData;
udpData.append(std::move(readData.udpPacket.buf)); udpData.append(std::move(readData.udpPacket.buf));
uint64_t processedPacketsTotal = 0;
for (uint16_t processedPackets = 0; for (uint16_t processedPackets = 0;
!udpData.empty() && processedPackets < kMaxNumCoalescedPackets; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets;
processedPackets++) { processedPackets++) {
@ -1367,7 +1368,10 @@ void onServerReadDataFromOpen(
conn.readCodec->setInitialHeaderCipher(nullptr); conn.readCodec->setInitialHeaderCipher(nullptr);
implicitAckCryptoStream(conn, EncryptionLevel::Initial); implicitAckCryptoStream(conn, EncryptionLevel::Initial);
} }
QUIC_STATS(conn.statsCallback, onPacketProcessed); processedPacketsTotal++;
}
if (processedPacketsTotal > 0) {
QUIC_STATS(conn.statsCallback, onPacketsProcessed, processedPacketsTotal);
} }
VLOG_IF(4, !udpData.empty()) VLOG_IF(4, !udpData.empty())
<< "Leaving " << udpData.chainLength() << "Leaving " << udpData.chainLength()

View File

@ -41,14 +41,32 @@ class QuicTransportStatsCallback {
// packet level metrics // packet level metrics
virtual void onPacketReceived() = 0; virtual void onPacketReceived() = 0;
virtual void onPacketsReceived(uint32_t n) {
for (uint32_t i = 0; i < n; i++) {
onPacketReceived();
}
}
virtual void onDuplicatedPacketReceived() = 0; virtual void onDuplicatedPacketReceived() = 0;
virtual void onOutOfOrderPacketReceived() = 0; virtual void onOutOfOrderPacketReceived() = 0;
virtual void onPacketProcessed() = 0; virtual void onPacketProcessed() = 0;
virtual void onPacketsProcessed(uint32_t n) {
for (uint32_t i = 0; i < n; i++) {
onPacketProcessed();
}
}
virtual void onPacketSent() = 0; virtual void onPacketSent() = 0;
virtual void onPacketsSent(uint32_t n) {
for (uint32_t i = 0; i < n; i++) {
onPacketSent();
}
}
virtual void onDSRPacketSent(size_t pktSize) = 0; virtual void onDSRPacketSent(size_t pktSize) = 0;
virtual void onPacketRetransmission() = 0; virtual void onPacketRetransmission() = 0;
@ -155,6 +173,13 @@ class QuicTransportStatsCallback {
virtual void onShortHeaderPadding(size_t padSize) = 0; virtual void onShortHeaderPadding(size_t padSize) = 0;
virtual void onShortHeaderPaddingBatch(uint32_t n, size_t padSize) {
for (uint32_t i = 0; i + 1 < n; i++) {
onShortHeaderPadding(0);
}
onShortHeaderPadding(padSize);
}
virtual void onPacerTimerLagged() = 0; virtual void onPacerTimerLagged() = 0;
virtual void onPeerMaxUniStreamsLimitSaturated() = 0; virtual void onPeerMaxUniStreamsLimitSaturated() = 0;