diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 730a3b99b..f1e4ba51e 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -396,6 +396,9 @@ void updateConnection( conn.congestionController->getCongestionWindow()); } } + if (conn.pacer && !pureAck) { + conn.pacer->onPacketSent(); + } if (pkt.isHandshake) { ++conn.outstandingHandshakePacketsCount; conn.lossState.lastHandshakePacketSentTime = pkt.time; diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index dff3af0a4..f8afe4085 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -743,6 +743,9 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) { auto conn = createConn(); conn->qLogger = std::make_shared(); auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake); + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn->pacer = std::move(mockPacer); auto mockCongestionController = std::make_unique(); auto rawController = mockCongestionController.get(); conn->congestionController = std::move(mockCongestionController); @@ -751,6 +754,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) { ackFrame.ackBlocks.insert(10); packet.packet.frames.push_back(std::move(ackFrame)); EXPECT_CALL(*rawController, onPacketSent(_)).Times(0); + EXPECT_CALL(*rawPacer, onPacketSent()).Times(0); updateConnection( *conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet)); EXPECT_EQ(1, conn->outstandingPackets.size()); @@ -986,6 +990,34 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketWithCC) { conn->transportSettings.writeConnectionDataPacketsLimit); } +TEST_F(QuicTransportFunctionsTest, WriteQuicdataToSocketWithPacer) { + auto conn = createConn(); + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn->pacer = std::move(mockPacer); + + EventBase evb; + auto socket = std::make_unique(&evb); + auto rawSocket = socket.get(); + + auto stream1 = conn->streamManager->createNextBidirectionalStream().value(); + auto buf = + IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012"); + writeDataToQuicStream(*stream1, buf->clone(), true); + + EXPECT_CALL(*rawPacer, onPacketSent()).Times(1); + EXPECT_CALL(*transportInfoCb_, onWrite(_)); + writeQuicDataToSocket( + *rawSocket, + *conn, + *conn->clientConnectionId, + *conn->serverConnectionId, + *aead, + *headerCipher, + getVersion(*conn), + conn->transportSettings.writeConnectionDataPacketsLimit); +} + TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketLimitTest) { auto conn = createConn(); auto mockCongestionController = std::make_unique(); diff --git a/quic/congestion_control/Bbr.cpp b/quic/congestion_control/Bbr.cpp index d2e3a32e2..52b87ae14 100644 --- a/quic/congestion_control/Bbr.cpp +++ b/quic/congestion_control/Bbr.cpp @@ -158,6 +158,9 @@ void BbrCongestionController::onPacketAckOrLoss( } if (lossEvent) { onPacketLoss(*lossEvent); + if (conn_.pacer) { + conn_.pacer->onPacketsLoss(); + } } if (ackEvent && ackEvent->largestAckedPacket.hasValue()) { CHECK(!ackEvent->ackedPackets.empty()); diff --git a/quic/congestion_control/Copa.cpp b/quic/congestion_control/Copa.cpp index 798750fdc..2b416c268 100644 --- a/quic/congestion_control/Copa.cpp +++ b/quic/congestion_control/Copa.cpp @@ -127,6 +127,9 @@ void Copa::onPacketAckOrLoss( folly::Optional loss) { if (loss) { onPacketLoss(*loss); + if (conn_.pacer) { + conn_.pacer->onPacketsLoss(); + } QUIC_TRACE(copa_loss, conn_, cwndBytes_, bytesInFlight_); } if (ack && ack->largestAckedPacket.hasValue()) { diff --git a/quic/congestion_control/NewReno.cpp b/quic/congestion_control/NewReno.cpp index f9ad28e44..5c61fd3e8 100644 --- a/quic/congestion_control/NewReno.cpp +++ b/quic/congestion_control/NewReno.cpp @@ -89,6 +89,8 @@ void NewReno::onPacketAckOrLoss( folly::Optional lossEvent) { if (lossEvent) { onPacketLoss(*lossEvent); + // When we start to support pacing in NewReno, we need to call onPacketsLoss + // on the pacer when there is loss. } if (ackEvent && ackEvent->largestAckedPacket.hasValue()) { onAckEvent(*ackEvent); diff --git a/quic/congestion_control/Pacer.cpp b/quic/congestion_control/Pacer.cpp index 10fc7b5b6..bcbb8eaab 100644 --- a/quic/congestion_control/Pacer.cpp +++ b/quic/congestion_control/Pacer.cpp @@ -20,8 +20,8 @@ DefaultPacer::DefaultPacer( minCwndInMss_(minCwndInMss), batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), pacingRateCalculator_(calculatePacingRate), - cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit) { -} + cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), + tokens_(conn.transportSettings.writeConnectionDataPacketsLimit) {} // TODO: we choose to keep refershing pacing rate even when we are app-limited, // so that when we exit app-limited, we have an updated pacing rate. But I don't @@ -46,14 +46,25 @@ void DefaultPacer::refreshPacingRate( pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt); writeInterval_ = pacingRate.interval; batchSize_ = pacingRate.burstSize; + tokens_ += batchSize_; } void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) { scheduledWriteTime_ = currentTime; } +void DefaultPacer::onPacketSent() { + if (tokens_) { + --tokens_; + } +} + +void DefaultPacer::onPacketsLoss() { + tokens_ = 0UL; +} + std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const { - return appLimited_ ? 0us : writeInterval_; + return (appLimited_ || tokens_) ? 0us : writeInterval_; } uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) { @@ -68,13 +79,19 @@ uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) { return batchSize_; } if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) { - return batchSize_; + return tokens_; } auto adjustedInterval = std::chrono::duration_cast( currentTime - *scheduledWriteTime_ + writeInterval_); cachedBatchSize_ = std::ceil( adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count()); - return cachedBatchSize_; + if (cachedBatchSize_ < batchSize_) { + LOG(ERROR) + << "Quic pacer batch size calculation: cachedBatchSize < batchSize"; + } + tokens_ += + (cachedBatchSize_ > batchSize_ ? cachedBatchSize_ - batchSize_ : 0); + return tokens_; } uint64_t DefaultPacer::getCachedWriteBatchSize() const { diff --git a/quic/congestion_control/Pacer.h b/quic/congestion_control/Pacer.h index 0986e232d..c4bb9f24b 100644 --- a/quic/congestion_control/Pacer.h +++ b/quic/congestion_control/Pacer.h @@ -43,6 +43,9 @@ class DefaultPacer : public Pacer { void setAppLimited(bool limited) override; + void onPacketSent() override; + void onPacketsLoss() override; + private: const QuicConnectionStateBase& conn_; uint64_t minCwndInMss_; @@ -52,5 +55,6 @@ class DefaultPacer : public Pacer { PacingRateCalculator pacingRateCalculator_; uint64_t cachedBatchSize_; bool appLimited_{false}; + uint64_t tokens_; }; } // namespace quic diff --git a/quic/congestion_control/QuicCubic.cpp b/quic/congestion_control/QuicCubic.cpp index 39a05ffb7..3264c847b 100644 --- a/quic/congestion_control/QuicCubic.cpp +++ b/quic/congestion_control/QuicCubic.cpp @@ -384,6 +384,9 @@ void Cubic::onPacketAckOrLoss( // furture. if (lossEvent) { onPacketLoss(*lossEvent); + if (conn_.pacer) { + conn_.pacer->onPacketsLoss(); + } } if (ackEvent && ackEvent->largestAckedPacket.hasValue()) { CHECK(!ackEvent->ackedPackets.empty()); diff --git a/quic/congestion_control/test/BbrTest.cpp b/quic/congestion_control/test/BbrTest.cpp index a848501d2..2ca7fecd1 100644 --- a/quic/congestion_control/test/BbrTest.cpp +++ b/quic/congestion_control/test/BbrTest.cpp @@ -667,5 +667,21 @@ TEST_F(BbrTest, UpdatePacerAppLimited) { makeAck(0, 1000, Clock::now(), Clock::now() - 5ms), folly::none); } +TEST_F(BbrTest, PacketLossInvokesPacer) { + QuicConnectionStateBase conn(QuicNodeType::Client); + BbrCongestionController::BbrConfig config; + BbrCongestionController bbr(conn, config); + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); + + auto packet = makeTestingWritePacket(0, 1000, 1000); + bbr.onPacketSent(packet); + EXPECT_CALL(*rawPacer, onPacketsLoss()).Times(1); + CongestionController::LossEvent lossEvent; + lossEvent.addLostPacket(packet); + bbr.onPacketAckOrLoss(folly::none, lossEvent); +} + } // namespace test } // namespace quic diff --git a/quic/congestion_control/test/CopaTest.cpp b/quic/congestion_control/test/CopaTest.cpp index 369bdca5d..6b1781a32 100644 --- a/quic/congestion_control/test/CopaTest.cpp +++ b/quic/congestion_control/test/CopaTest.cpp @@ -10,6 +10,7 @@ #include #include +#include using namespace testing; @@ -455,5 +456,19 @@ TEST_F(CopaTest, NoLargestAckedPacketNoCrash) { EXPECT_EQ(event->congestionEvent, kCongestionPacketLoss); } +TEST_F(CopaTest, PacketLossInvokesPacer) { + QuicServerConnectionState conn; + Copa copa(conn); + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); + auto packet = createPacket(0 /* pacetNum */, 1000, 1000); + copa.onPacketSent(packet); + EXPECT_CALL(*rawPacer, onPacketsLoss()).Times(1); + CongestionController::LossEvent lossEvent; + lossEvent.addLostPacket(packet); + copa.onPacketAckOrLoss(folly::none, lossEvent); +} + } // namespace test } // namespace quic diff --git a/quic/congestion_control/test/CubicTest.cpp b/quic/congestion_control/test/CubicTest.cpp index 72c282891..f9d5c729a 100644 --- a/quic/congestion_control/test/CubicTest.cpp +++ b/quic/congestion_control/test/CubicTest.cpp @@ -303,5 +303,20 @@ TEST_F(CubicTest, PacingGain) { auto event = dynamic_cast(tmp.get()); EXPECT_EQ(event->update, kRecalculateTimeToOrigin); } + +TEST_F(CubicTest, PacetLossInvokesPacer) { + QuicConnectionStateBase conn(QuicNodeType::Client); + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); + Cubic cubic(conn); + + auto packet = makeTestingWritePacket(0, 1000, 1000); + cubic.onPacketSent(packet); + EXPECT_CALL(*rawPacer, onPacketsLoss()).Times(1); + CongestionController::LossEvent lossEvent; + lossEvent.addLostPacket(packet); + cubic.onPacketAckOrLoss(folly::none, lossEvent); +} } // namespace test } // namespace quic diff --git a/quic/congestion_control/test/PacerTest.cpp b/quic/congestion_control/test/PacerTest.cpp index b499952e4..c7db460cc 100644 --- a/quic/congestion_control/test/PacerTest.cpp +++ b/quic/congestion_control/test/PacerTest.cpp @@ -14,6 +14,14 @@ using namespace testing; namespace quic { namespace test { +namespace { +void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) { + for (size_t i = 0; i < tokensToConsume; i++) { + pacer.onPacketSent(); + } +} +} // namespace + class PacerTest : public Test { public: void SetUp() override { @@ -40,8 +48,13 @@ TEST_F(PacerTest, RateCalculator) { return PacingRate::Builder().setInterval(1234us).setBurstSize(4321).build(); }); pacer.refreshPacingRate(200000, 200us); + EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + EXPECT_EQ( + 4321 + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(Clock::now())); + consumeTokensHelper( + pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit); EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite()); - EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(Clock::now())); } TEST_F(PacerTest, CompensateTimerDrift) { @@ -54,11 +67,22 @@ TEST_F(PacerTest, CompensateTimerDrift) { auto currentTime = Clock::now(); pacer.refreshPacingRate(20, 100us); // These two values do not matter here pacer.onPacedWriteScheduled(currentTime); - EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(currentTime + 1000us)); + EXPECT_EQ( + 20 + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(currentTime + 1000us)); // Query batch size again without calling onPacedWriteScheduled won't do timer - // drift compensation - EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); + // drift compensation. But token_ keeps the last compenstation. + EXPECT_EQ( + 20 + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); + + // Consume a few: + consumeTokensHelper(pacer, 3); + + EXPECT_EQ( + 20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3, + pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); } TEST_F(PacerTest, NextWriteTime) { @@ -71,6 +95,15 @@ TEST_F(PacerTest, NextWriteTime) { return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build(); }); pacer.refreshPacingRate(20, 1000us); + // Right after refresh, it's always 0us. You can always send right after an + // ack. + EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + + // Consume all the tokens: + consumeTokensHelper( + pacer, 10 + conn.transportSettings.writeConnectionDataPacketsLimit); + + // Then we use real delay: EXPECT_EQ(1000us, pacer.getTimeUntilNextWrite()); } @@ -125,5 +158,48 @@ TEST_F(PacerTest, AppLimited) { EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now())); } +TEST_F(PacerTest, Tokens) { + // Pacer has tokens right after init: + EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + EXPECT_EQ( + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(Clock::now())); + + // Consume all initial tokens: + consumeTokensHelper( + pacer, conn.transportSettings.writeConnectionDataPacketsLimit); + + // Pacing rate: 10 mss per 10 ms + pacer.setPacingRateCalculator([](const QuicConnectionStateBase&, + uint64_t, + uint64_t, + std::chrono::microseconds) { + return PacingRate::Builder().setInterval(10ms).setBurstSize(10).build(); + }); + + // These input doesn't matter, the rate calculator above returns fixed values. + pacer.refreshPacingRate(100, 100ms); + + EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now())); + + // Consume all tokens: + consumeTokensHelper(pacer, 10); + + EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite()); + EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now())); + + // Do a schedule: + auto curTime = Clock::now(); + pacer.onPacedWriteScheduled(curTime); + // 10ms later you should have 10 mss credit: + EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms)); + + // Schedule again from this point: + pacer.onPacedWriteScheduled(curTime + 10ms); + // Then elapse another 10ms, and previous tokens hasn't been used: + EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms)); +} + } // namespace test } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 4e6e00a8d..0f1ed9098 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -165,6 +165,8 @@ struct Pacer { virtual uint64_t getCachedWriteBatchSize() const = 0; virtual void setAppLimited(bool limited) = 0; + virtual void onPacketSent() = 0; + virtual void onPacketsLoss() = 0; }; struct PacingRate { diff --git a/quic/state/test/Mocks.h b/quic/state/test/Mocks.h index d9e9bc9ea..115341c44 100644 --- a/quic/state/test/Mocks.h +++ b/quic/state/test/Mocks.h @@ -40,6 +40,8 @@ class MockPacer : public Pacer { MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint)); MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t()); MOCK_METHOD1(setAppLimited, void(bool)); + MOCK_METHOD0(onPacketSent, void()); + MOCK_METHOD0(onPacketsLoss, void()); }; } // namespace test } // namespace quic