From ee90db8520e6551d26620f2f9564003d22f13c49 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Sun, 21 Apr 2024 16:38:06 -0700 Subject: [PATCH] Aggregate QUIC stats callbacks Summary: This reduces the number of stats callbacks when processing multiple packets in rapid succession. Reviewed By: mjoras Differential Revision: D56315022 fbshipit-source-id: 750024301f28b21e3125c144ead6f115706736a4 --- quic/api/QuicPacketScheduler.cpp | 12 ++++--- quic/api/QuicPacketScheduler.h | 8 +++-- quic/api/QuicTransportFunctions.cpp | 30 ++++++++++++------ quic/api/test/QuicPacketSchedulerTest.cpp | 8 ++--- quic/api/test/QuicTransportFunctionsTest.cpp | 5 ++- quic/client/QuicClientTransport.cpp | 33 ++++++++++++++++---- quic/client/QuicClientTransport.h | 3 +- quic/server/state/ServerStateMachine.cpp | 6 +++- quic/state/QuicTransportStatsCallback.h | 25 +++++++++++++++ 9 files changed, 99 insertions(+), 31 deletions(-) diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index 32ea0efc8..0dc743fbf 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -239,6 +239,7 @@ FrameScheduler::FrameScheduler( SchedulingResult FrameScheduler::scheduleFramesForPacket( PacketBuilderInterface&& builder, uint32_t writableBytes) { + size_t shortHeaderPadding = 0; builder.encodePacketHeader(); // We need to keep track of writable bytes after writing header. writableBytes = writableBytes > builder.getHeaderBytes() @@ -322,12 +323,13 @@ SchedulingResult FrameScheduler::scheduleFramesForPacket( for (size_t i = 0; i < paddingIncrement; i++) { writeFrame(PaddingFrame(), builder); } - QUIC_STATS(conn_.statsCallback, onShortHeaderPadding, paddingIncrement); + shortHeaderPadding = paddingIncrement; } } } - return SchedulingResult(folly::none, std::move(builder).buildPacket()); + return SchedulingResult( + folly::none, std::move(builder).buildPacket(), shortHeaderPadding); } void FrameScheduler::writeNextAcks(PacketBuilderInterface& builder) { @@ -942,7 +944,9 @@ SchedulingResult CloningScheduler::scheduleFramesForPacket( auto rebuildResult = rebuilder.rebuildFromPacket(outstandingPacket); if (rebuildResult) { return SchedulingResult( - std::move(rebuildResult), std::move(*internalBuilder).buildPacket()); + std::move(rebuildResult), + std::move(*internalBuilder).buildPacket(), + 0); } else if ( conn_.transportSettings.dataPathType == DataPathType::ContinuousMemory) { @@ -961,7 +965,7 @@ SchedulingResult CloningScheduler::scheduleFramesForPacket( buf->trimEnd(buf->length() - prevSize); } } - return SchedulingResult(folly::none, folly::none); + return SchedulingResult(folly::none, folly::none, 0); } folly::StringPiece CloningScheduler::name() const { diff --git a/quic/api/QuicPacketScheduler.h b/quic/api/QuicPacketScheduler.h index 77b2e4e48..d11bd5e4b 100644 --- a/quic/api/QuicPacketScheduler.h +++ b/quic/api/QuicPacketScheduler.h @@ -25,11 +25,15 @@ namespace quic { struct SchedulingResult { folly::Optional packetEvent; folly::Optional packet; + size_t shortHeaderPadding; explicit SchedulingResult( folly::Optional packetEventIn, - folly::Optional packetIn) - : packetEvent(std::move(packetEventIn)), packet(std::move(packetIn)) {} + folly::Optional packetIn, + size_t shortHeaderPaddingIn = 0) + : packetEvent(std::move(packetEventIn)), + packet(std::move(packetIn)), + shortHeaderPadding(shortHeaderPaddingIn) {} }; /** diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 29cd1456a..8b98f0a2a 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -308,11 +308,6 @@ DataPathResult continuousMemoryBuildScheduleEncrypt( } // TODO: I think we should add an API that doesn't need a buffer. bool ret = ioBufBatch.write(nullptr /* no need to pass buf */, encodedSize); - // update stats and connection - if (ret) { - QUIC_STATS(connection.statsCallback, onWrite, encodedSize); - QUIC_STATS(connection.statsCallback, onPacketSent); - } return DataPathResult::makeWriteResult( ret, std::move(result), encodedSize, encodedBodySize); } @@ -384,11 +379,6 @@ DataPathResult iobufChainBasedBuildScheduleEncrypt( << " encodedBodySize=" << encodedBodySize; } bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize); - if (ret) { - // update stats and connection - QUIC_STATS(connection.statsCallback, onWrite, encodedSize); - QUIC_STATS(connection.statsCallback, onPacketSent); - } return DataPathResult::makeWriteResult( ret, std::move(result), encodedSize, encodedBodySize); } @@ -1522,6 +1512,22 @@ WriteQuicDataResult writeConnectionDataToSocket( : connection.transportSettings.maxBatchSize; uint64_t bytesWritten = 0; + uint64_t shortHeaderPadding = 0; + uint64_t shortHeaderPaddingCount = 0; + SCOPE_EXIT { + auto nSent = ioBufBatch.getPktSent(); + if (nSent > 0) { + QUIC_STATS(connection.statsCallback, onPacketsSent, nSent); + QUIC_STATS(connection.statsCallback, onWrite, bytesWritten); + if (shortHeaderPadding > 0) { + QUIC_STATS( + connection.statsCallback, + onShortHeaderPaddingBatch, + shortHeaderPaddingCount, + shortHeaderPadding); + } + } + }; while (scheduler.hasData() && ioBufBatch.getPktSent() < packetLimit && ((ioBufBatch.getPktSent() < batchSize) || @@ -1565,6 +1571,10 @@ WriteQuicDataResult writeConnectionDataToSocket( // pretend write was also successful but packet is lost somewhere in the // network. bytesWritten += ret.encodedSize; + if (ret.result && ret.result->shortHeaderPadding > 0) { + shortHeaderPaddingCount++; + shortHeaderPadding += ret.result->shortHeaderPadding; + } auto& result = ret.result; updateConnection( diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 3cdf9875f..09002a921 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -2390,12 +2390,12 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingWithSpaceForPadding) { NiceMock quicStats; conn.statsCallback = &quicStats; - EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1); auto result1 = scheduler.scheduleFramesForPacket( std::move(builder1), conn.udpSendPacketLen); - EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1); + EXPECT_GT(result1.shortHeaderPadding, 0); auto result2 = scheduler.scheduleFramesForPacket( std::move(builder2), conn.udpSendPacketLen); + EXPECT_GT(result2.shortHeaderPadding, 0); auto headerLength1 = result1.packet->header.computeChainDataLength(); auto bodyLength1 = result1.packet->body.computeChainDataLength(); @@ -2451,9 +2451,9 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingNearMaxPacketLength) { NiceMock quicStats; conn.statsCallback = &quicStats; - EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1); auto result = scheduler.scheduleFramesForPacket( std::move(builder), conn.udpSendPacketLen); + EXPECT_GT(result.shortHeaderPadding, 0); auto headerLength = result.packet->header.computeChainDataLength(); auto bodyLength = result.packet->body.computeChainDataLength(); @@ -2506,9 +2506,9 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingMaxPacketLength) { NiceMock quicStats; conn.statsCallback = &quicStats; - EXPECT_CALL(quicStats, onShortHeaderPadding(_)).Times(1); auto result = scheduler.scheduleFramesForPacket( std::move(builder), conn.udpSendPacketLen); + EXPECT_EQ(result.shortHeaderPadding, 0); auto headerLength = result.packet->header.computeChainDataLength(); auto bodyLength = result.packet->body.computeChainDataLength(); diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index 8fe05e55e..6d4af3dfa 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -2715,8 +2715,7 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketLimitTest) { })); EXPECT_CALL(*rawCongestionController, onPacketSent(_)) .Times(kDefaultWriteConnectionDataPacketLimit * 2); - EXPECT_CALL(*quicStats_, onWrite(_)) - .Times(kDefaultWriteConnectionDataPacketLimit * 2); + EXPECT_CALL(*quicStats_, onWrite(_)).Times(1); res = writeQuicDataToSocket( *rawSocket, *conn, @@ -2926,7 +2925,7 @@ TEST_F(QuicTransportFunctionsTest, WriteBlockedFrameWhenBlocked) { // Artificially Block the stream stream1->flowControlState.peerAdvertisedMaxOffset = 10; // writes blocked frame in additionally - EXPECT_CALL(*quicStats_, onWrite(_)).Times(2); + EXPECT_CALL(*quicStats_, onWrite(_)).Times(1); writeQuicDataToSocket( *rawSocket, *conn, diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index 7d271985a..d57b9ec55 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -1167,6 +1167,8 @@ void QuicClientTransport::recvMsg( NetworkData& networkData, folly::Optional& server, size_t& totalData) { + uint32_t totalPacketLen = 0; + uint32_t totalPackets = 0; for (int packetNum = 0; packetNum < numPackets; ++packetNum) { // We create 1 buffer per packet so that it is not shared, this enables // us to decrypt in place. If the fizz decrypt api could decrypt in-place @@ -1281,8 +1283,11 @@ void QuicClientTransport::recvMsg( } else { networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer))); } - trackDatagramReceived(bytesRead); + maybeQlogDatagram(bytesRead); + totalPackets++; + totalPacketLen += bytesRead; } + trackDatagramsReceived(totalPackets, totalPacketLen); } void QuicClientTransport::recvMmsg( @@ -1357,6 +1362,8 @@ void QuicClientTransport::recvMmsg( } CHECK_LE(numMsgsRecvd, numPackets); + uint32_t totalPacketLen = 0; + uint32_t totalPackets = 0; for (uint16_t i = 0; i < static_cast(numMsgsRecvd); ++i) { auto& addr = recvmmsgStorage_.impl_[i].addr; auto& readBuffer = recvmmsgStorage_.impl_[i].readBuffer; @@ -1425,8 +1432,11 @@ void QuicClientTransport::recvMmsg( networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer))); } - trackDatagramReceived(bytesRead); + maybeQlogDatagram(bytesRead); + totalPackets++; + totalPacketLen += bytesRead; } + trackDatagramsReceived(totalPackets, totalPacketLen); } void QuicClientTransport::onNotifyDataAvailable( @@ -1441,6 +1451,8 @@ void QuicClientTransport::onNotifyDataAvailable( networkData.reserve(numPackets); size_t totalData = 0; folly::Optional server; + uint32_t totalPacketLen = 0; + uint32_t totalPackets = 0; if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) { const auto result = sock.recvmmsgNetworkData( @@ -1451,8 +1463,12 @@ void QuicClientTransport::onNotifyDataAvailable( if (!packet.buf) { continue; } - trackDatagramReceived(packet.buf->computeChainDataLength()); + auto len = packet.buf->computeChainDataLength(); + maybeQlogDatagram(len); + totalPackets++; + totalPacketLen += len; } + trackDatagramsReceived(totalPackets, totalPacketLen); // Propagate errors // TODO(bschlinker): Investigate generalization of loopDetectorCallback @@ -1713,12 +1729,17 @@ void QuicClientTransport::setTransportStatsCallback( } } -void QuicClientTransport::trackDatagramReceived(size_t len) { +void QuicClientTransport::maybeQlogDatagram(size_t len) { if (conn_->qLogger) { conn_->qLogger->addDatagramReceived(len); } - QUIC_STATS(statsCallback_, onPacketReceived); - QUIC_STATS(statsCallback_, onRead, len); +} + +void QuicClientTransport::trackDatagramsReceived( + uint32_t totalPackets, + uint32_t totalPacketLen) { + QUIC_STATS(statsCallback_, onPacketsReceived, totalPackets); + QUIC_STATS(statsCallback_, onRead, totalPacketLen); } void QuicClientTransport::maybeSendTransportKnobs() { diff --git a/quic/client/QuicClientTransport.h b/quic/client/QuicClientTransport.h index 46a099a9a..97d020e06 100644 --- a/quic/client/QuicClientTransport.h +++ b/quic/client/QuicClientTransport.h @@ -317,7 +317,8 @@ class QuicClientTransport }; void adjustGROBuffers(); - void trackDatagramReceived(size_t len); + void maybeQlogDatagram(size_t len); + void trackDatagramsReceived(uint32_t totalPackets, uint32_t totalPacketLen); /** * Send quic transport knobs defined by transportSettings.knobs to peer. This diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index 75e4171e5..38712f5dd 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -848,6 +848,7 @@ void onServerReadDataFromOpen( } BufQueue udpData; udpData.append(std::move(readData.udpPacket.buf)); + uint64_t processedPacketsTotal = 0; for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { @@ -1367,7 +1368,10 @@ void onServerReadDataFromOpen( conn.readCodec->setInitialHeaderCipher(nullptr); implicitAckCryptoStream(conn, EncryptionLevel::Initial); } - QUIC_STATS(conn.statsCallback, onPacketProcessed); + processedPacketsTotal++; + } + if (processedPacketsTotal > 0) { + QUIC_STATS(conn.statsCallback, onPacketsProcessed, processedPacketsTotal); } VLOG_IF(4, !udpData.empty()) << "Leaving " << udpData.chainLength() diff --git a/quic/state/QuicTransportStatsCallback.h b/quic/state/QuicTransportStatsCallback.h index 25ffe6c22..13ae5748a 100644 --- a/quic/state/QuicTransportStatsCallback.h +++ b/quic/state/QuicTransportStatsCallback.h @@ -41,14 +41,32 @@ class QuicTransportStatsCallback { // packet level metrics virtual void onPacketReceived() = 0; + virtual void onPacketsReceived(uint32_t n) { + for (uint32_t i = 0; i < n; i++) { + onPacketReceived(); + } + } + virtual void onDuplicatedPacketReceived() = 0; virtual void onOutOfOrderPacketReceived() = 0; virtual void onPacketProcessed() = 0; + virtual void onPacketsProcessed(uint32_t n) { + for (uint32_t i = 0; i < n; i++) { + onPacketProcessed(); + } + } + virtual void onPacketSent() = 0; + virtual void onPacketsSent(uint32_t n) { + for (uint32_t i = 0; i < n; i++) { + onPacketSent(); + } + } + virtual void onDSRPacketSent(size_t pktSize) = 0; virtual void onPacketRetransmission() = 0; @@ -155,6 +173,13 @@ class QuicTransportStatsCallback { virtual void onShortHeaderPadding(size_t padSize) = 0; + virtual void onShortHeaderPaddingBatch(uint32_t n, size_t padSize) { + for (uint32_t i = 0; i + 1 < n; i++) { + onShortHeaderPadding(0); + } + onShortHeaderPadding(padSize); + } + virtual void onPacerTimerLagged() = 0; virtual void onPeerMaxUniStreamsLimitSaturated() = 0;