diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index c675c4767..07f39216e 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -49,7 +49,6 @@ QuicTransportBase::QuicTransportBase( LooperType::WriteLooper)) { writeLooper_->setPacingFunction([this]() -> auto { if (isConnectionPaced(*conn_)) { - conn_->pacer->onPacedWriteScheduled(Clock::now()); return conn_->pacer->getTimeUntilNextWrite(); } return 0us; diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 067309ae4..a775e0755 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -402,7 +402,7 @@ void updateConnection( } } if (conn.pacer) { - conn.pacer->onPacketSent(); + conn.pacer->onPacketSent(pkt.encodedSize); } if (conn.pathValidationLimiter && (conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) { diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index a8e37f6ce..1cc0c3081 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -765,7 +765,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) { ackFrame.ackBlocks.emplace_back(0, 10); packet.packet.frames.push_back(std::move(ackFrame)); EXPECT_CALL(*rawController, onPacketSent(_)).Times(0); - EXPECT_CALL(*rawPacer, onPacketSent()).Times(0); + EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(0); updateConnection( *conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet)); EXPECT_EQ(0, conn->outstandingPackets.size()); @@ -1020,7 +1020,7 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicdataToSocketWithPacer) { IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012"); writeDataToQuicStream(*stream1, buf->clone(), true); - EXPECT_CALL(*rawPacer, onPacketSent()).Times(1); + EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(1); EXPECT_CALL(*transportInfoCb_, onWrite(_)); writeQuicDataToSocket( *rawSocket, diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 63a6b337c..3328845a3 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -2511,7 +2511,6 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) { EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) .WillRepeatedly(Return(1)); - EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_)); EXPECT_CALL(*rawPacer, getTimeUntilNextWrite()) .WillRepeatedly(Return(3600000ms)); // This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will diff --git a/quic/congestion_control/CongestionControlFunctions.cpp b/quic/congestion_control/CongestionControlFunctions.cpp index b8ecf5f83..465d6f5a4 100644 --- a/quic/congestion_control/CongestionControlFunctions.cpp +++ b/quic/congestion_control/CongestionControlFunctions.cpp @@ -29,29 +29,21 @@ PacingRate calculatePacingRate( uint64_t cwnd, uint64_t minCwndInMss, std::chrono::microseconds rtt) { - if (conn.transportSettings.pacingTimerTickInterval > rtt) { + if (conn.transportSettings.pacingTimerTickInterval >= rtt) { // We cannot really pace in this case. return PacingRate::Builder() .setInterval(0us) .setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit) .build(); } + uint64_t numIntervals = rtt / conn.transportSettings.pacingTimerTickInterval; uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen); - // Each interval we want to send cwndInpackets / (rtt / minimalInverval) - // number of packets. uint64_t burstPerInterval = std::max( - conn.transportSettings.minBurstPackets, - static_cast(std::ceil( - static_cast(cwndInPackets) * - static_cast( - conn.transportSettings.pacingTimerTickInterval.count()) / - static_cast(rtt.count())))); - auto interval = timeMax( - conn.transportSettings.pacingTimerTickInterval, - rtt * burstPerInterval / cwndInPackets); + conn.transportSettings.minBurstPackets, cwndInPackets / numIntervals); return PacingRate::Builder() - .setInterval(interval) - .setBurstSize(burstPerInterval) - .build(); + .setInterval(conn.transportSettings.pacingTimerTickInterval) + .setBurstSize(burstPerInterval) + .build(); } + } // namespace quic diff --git a/quic/congestion_control/Pacer.cpp b/quic/congestion_control/Pacer.cpp index eb196bf89..463c33cba 100644 --- a/quic/congestion_control/Pacer.cpp +++ b/quic/congestion_control/Pacer.cpp @@ -8,6 +8,7 @@ #include +#include #include #include @@ -21,7 +22,9 @@ DefaultPacer::DefaultPacer( batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), pacingRateCalculator_(calculatePacingRate), cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), - tokens_(conn.transportSettings.writeConnectionDataPacketsLimit) {} + tokens_(conn.transportSettings.writeConnectionDataPacketsLimit), + nextWriteTime_(Clock::now()), + lastWriteTime_(Clock::now()) {} // TODO: we choose to keep refershing pacing rate even when we are app-limited, // so that when we exit app-limited, we have an updated pacing rate. But I don't @@ -29,15 +32,17 @@ DefaultPacer::DefaultPacer( void DefaultPacer::refreshPacingRate( uint64_t cwndBytes, std::chrono::microseconds rtt) { + auto currentTime = Clock::now(); if (rtt < conn_.transportSettings.pacingTimerTickInterval) { writeInterval_ = 0us; batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; } else { - const PacingRate pacingRate = + const auto pacingRate = pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt); writeInterval_ = pacingRate.interval; batchSize_ = pacingRate.burstSize; - tokens_ += batchSize_; + lastPacingRateUpdate_ = currentTime; + bytesSentSincePacingRateUpdate_ = 0; } if (conn_.qLogger) { conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_); @@ -45,16 +50,40 @@ void DefaultPacer::refreshPacingRate( QUIC_TRACE( pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_); cachedBatchSize_ = batchSize_; + tokens_ = batchSize_; + nextWriteTime_ = currentTime; + if (firstUpdate_) { + firstUpdate_ = false; + lastWriteTime_ = currentTime; + } } -void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) { - scheduledWriteTime_ = currentTime; -} - -void DefaultPacer::onPacketSent() { +void DefaultPacer::onPacketSent(uint64_t bytesSent) { if (tokens_) { --tokens_; } + bytesSentSincePacingRateUpdate_ += bytesSent; + if (writeInterval_ != 0us && cachedBatchSize_ && !appLimited_ && + lastPacingRateUpdate_) { + Bandwidth expectedBandwidth( + cachedBatchSize_ * conn_.udpSendPacketLen, writeInterval_); + if (expectedBandwidth) { + Bandwidth actualPacingBandwidth( + bytesSentSincePacingRateUpdate_, + std::chrono::duration_cast( + Clock::now() - *lastPacingRateUpdate_)); + pacingLimited_ = actualPacingBandwidth < expectedBandwidth; + } + } else { + pacingLimited_ = false; + } + if (!pacingLimited_) { + lastWriteTime_ = Clock::now(); + } +} + +bool DefaultPacer::isPacingLimited() const noexcept { + return pacingLimited_; } void DefaultPacer::onPacketsLoss() { @@ -62,34 +91,30 @@ void DefaultPacer::onPacketsLoss() { } std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const { - return (appLimited_ || tokens_) ? 0us : writeInterval_; + return (writeInterval_ == 0us || appLimited_ || tokens_ || + Clock::now() + conn_.transportSettings.pacingTimerTickInterval >= + nextWriteTime_) + ? 0us + : timeMax( + conn_.transportSettings.pacingTimerTickInterval, + timeMin( + writeInterval_, + std::chrono::duration_cast( + nextWriteTime_ - Clock::now()))); } uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) { SCOPE_EXIT { - scheduledWriteTime_.clear(); + lastWriteTime_ = nextWriteTime_; + nextWriteTime_ += writeInterval_; }; - if (appLimited_) { - cachedBatchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; - return cachedBatchSize_; - } - if (writeInterval_ == 0us) { - return batchSize_; - } - if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) { - return tokens_; + if (appLimited_ || writeInterval_ == 0us) { + return conn_.transportSettings.writeConnectionDataPacketsLimit; } auto adjustedInterval = std::chrono::duration_cast( - currentTime - *scheduledWriteTime_ + writeInterval_); - cachedBatchSize_ = std::ceil( + timeMax(currentTime - lastWriteTime_, writeInterval_)); + return std::ceil( adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count()); - if (cachedBatchSize_ < batchSize_) { - LOG(ERROR) - << "Quic pacer batch size calculation: cachedBatchSize < batchSize"; - } - tokens_ += - (cachedBatchSize_ > batchSize_ ? cachedBatchSize_ - batchSize_ : 0); - return tokens_; } uint64_t DefaultPacer::getCachedWriteBatchSize() const { diff --git a/quic/congestion_control/Pacer.h b/quic/congestion_control/Pacer.h index c4bb9f24b..2b7d312ce 100644 --- a/quic/congestion_control/Pacer.h +++ b/quic/congestion_control/Pacer.h @@ -8,6 +8,7 @@ #pragma once +#include #include namespace quic { @@ -31,8 +32,6 @@ class DefaultPacer : public Pacer { void refreshPacingRate(uint64_t cwndBytes, std::chrono::microseconds rtt) override; - void onPacedWriteScheduled(TimePoint currentTime) override; - std::chrono::microseconds getTimeUntilNextWrite() const override; uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override; @@ -43,9 +42,12 @@ class DefaultPacer : public Pacer { void setAppLimited(bool limited) override; - void onPacketSent() override; + void onPacketSent(uint64_t bytesSent) override; void onPacketsLoss() override; + // Only used for test: + bool isPacingLimited() const noexcept; + private: const QuicConnectionStateBase& conn_; uint64_t minCwndInMss_; @@ -56,5 +58,11 @@ class DefaultPacer : public Pacer { uint64_t cachedBatchSize_; bool appLimited_{false}; uint64_t tokens_; + uint64_t bytesSentSincePacingRateUpdate_{0}; + folly::Optional lastPacingRateUpdate_; + bool pacingLimited_{false}; + TimePoint nextWriteTime_; + TimePoint lastWriteTime_; + bool firstUpdate_{true}; }; } // namespace quic diff --git a/quic/congestion_control/test/CongestionControlFunctionsTest.cpp b/quic/congestion_control/test/CongestionControlFunctionsTest.cpp index d708519d0..2fb836c9e 100644 --- a/quic/congestion_control/test/CongestionControlFunctionsTest.cpp +++ b/quic/congestion_control/test/CongestionControlFunctionsTest.cpp @@ -43,9 +43,7 @@ TEST_F(CongestionControlFunctionsTest, MinPacingRate) { conn.transportSettings.pacingTimerTickInterval = 1ms; auto result = calculatePacingRate( conn, 100, conn.transportSettings.minCwndInMss, 100ms); - // 100 ms rtt, 1ms tick interval, 100 mss cwnd, 5 mss min burst -> 5 mss every - // 5ms - EXPECT_EQ(5ms, result.interval); + EXPECT_EQ(1ms, result.interval); EXPECT_EQ(conn.transportSettings.minBurstPackets, result.burstSize); } @@ -56,7 +54,7 @@ TEST_F(CongestionControlFunctionsTest, SmallCwnd) { conn.transportSettings.pacingTimerTickInterval = 1ms; auto result = calculatePacingRate( conn, 10, conn.transportSettings.minCwndInMss, 100000us); - EXPECT_EQ(10ms, result.interval); + EXPECT_EQ(1ms, result.interval); EXPECT_EQ(1, result.burstSize); } @@ -72,6 +70,5 @@ TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) { conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize); } - } // namespace test } // namespace quic diff --git a/quic/congestion_control/test/PacerTest.cpp b/quic/congestion_control/test/PacerTest.cpp index c7db460cc..1cba3b32a 100644 --- a/quic/congestion_control/test/PacerTest.cpp +++ b/quic/congestion_control/test/PacerTest.cpp @@ -17,7 +17,7 @@ namespace test { namespace { void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) { for (size_t i = 0; i < tokensToConsume; i++) { - pacer.onPacketSent(); + pacer.onPacketSent(1000); } } } // namespace @@ -45,16 +45,19 @@ TEST_F(PacerTest, RateCalculator) { uint64_t, uint64_t, std::chrono::microseconds) { - return PacingRate::Builder().setInterval(1234us).setBurstSize(4321).build(); + return PacingRate::Builder().setInterval(500ms).setBurstSize(4321).build(); }); + auto currentTime = Clock::now(); pacer.refreshPacingRate(200000, 200us); EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); - EXPECT_EQ( - 4321 + conn.transportSettings.writeConnectionDataPacketsLimit, - pacer.updateAndGetWriteBatchSize(Clock::now())); - consumeTokensHelper( - pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit); - EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite()); + EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(currentTime)); + consumeTokensHelper(pacer, 4321); + EXPECT_NEAR( + std::chrono::duration_cast( + 500ms + currentTime - Clock::now()) + .count(), + pacer.getTimeUntilNextWrite().count(), + 2000); } TEST_F(PacerTest, CompensateTimerDrift) { @@ -66,23 +69,18 @@ TEST_F(PacerTest, CompensateTimerDrift) { }); auto currentTime = Clock::now(); pacer.refreshPacingRate(20, 100us); // These two values do not matter here - pacer.onPacedWriteScheduled(currentTime); - EXPECT_EQ( - 20 + conn.transportSettings.writeConnectionDataPacketsLimit, - pacer.updateAndGetWriteBatchSize(currentTime + 1000us)); + // After refresh, both last and next write time is very close to currentTime + EXPECT_NEAR(10, pacer.updateAndGetWriteBatchSize(currentTime + 1000us), 2); + // lastWriteTime ~= currentTime, nextWriteTime ~= currentTime + 1000us - // Query batch size again without calling onPacedWriteScheduled won't do timer - // drift compensation. But token_ keeps the last compenstation. - EXPECT_EQ( - 20 + conn.transportSettings.writeConnectionDataPacketsLimit, - pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); + EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2); + // lastWriteTime ~= currentTime + 1000us, nextWriteTime ~= currentTime + + // 2000us // Consume a few: consumeTokensHelper(pacer, 3); - EXPECT_EQ( - 20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3, - pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); + EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2); } TEST_F(PacerTest, NextWriteTime) { @@ -94,17 +92,18 @@ TEST_F(PacerTest, NextWriteTime) { std::chrono::microseconds rtt) { return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build(); }); - pacer.refreshPacingRate(20, 1000us); + auto currentTime = Clock::now(); + pacer.refreshPacingRate(20, 100ms); // Right after refresh, it's always 0us. You can always send right after an // ack. EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + pacer.updateAndGetWriteBatchSize(currentTime); // Consume all the tokens: - consumeTokensHelper( - pacer, 10 + conn.transportSettings.writeConnectionDataPacketsLimit); + consumeTokensHelper(pacer, 10); // Then we use real delay: - EXPECT_EQ(1000us, pacer.getTimeUntilNextWrite()); + EXPECT_NEAR(100 * 1000, pacer.getTimeUntilNextWrite().count(), 1000); } TEST_F(PacerTest, ImpossibleToPace) { @@ -138,17 +137,16 @@ TEST_F(PacerTest, CachedBatchSize) { .setBurstSize(cwndBytes / conn.udpSendPacketLen * 2) .build(); }); + auto currentTime = Clock::now(); pacer.refreshPacingRate(20 * conn.udpSendPacketLen, 100ms); EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); - auto currentTime = Clock::now(); - pacer.onPacedWriteScheduled(currentTime); pacer.updateAndGetWriteBatchSize(currentTime); + // lastWriteTime ~= currentTime, nextWriteTime_ ~= currentTime + 100ms EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); - pacer.onPacedWriteScheduled(currentTime + 100ms); - pacer.updateAndGetWriteBatchSize(currentTime + 200ms); - EXPECT_EQ(80, pacer.getCachedWriteBatchSize()); + EXPECT_EQ(80, pacer.updateAndGetWriteBatchSize(currentTime + 200ms)); + EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); } TEST_F(PacerTest, AppLimited) { @@ -158,48 +156,19 @@ TEST_F(PacerTest, AppLimited) { EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now())); } -TEST_F(PacerTest, Tokens) { - // Pacer has tokens right after init: - EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); - EXPECT_EQ( - conn.transportSettings.writeConnectionDataPacketsLimit, - pacer.updateAndGetWriteBatchSize(Clock::now())); - - // Consume all initial tokens: - consumeTokensHelper( - pacer, conn.transportSettings.writeConnectionDataPacketsLimit); - - // Pacing rate: 10 mss per 10 ms - pacer.setPacingRateCalculator([](const QuicConnectionStateBase&, +TEST_F(PacerTest, PacingLimited) { + pacer.setPacingRateCalculator([](const QuicConnectionStateBase& conn, + uint64_t cwndBytes, uint64_t, - uint64_t, - std::chrono::microseconds) { - return PacingRate::Builder().setInterval(10ms).setBurstSize(10).build(); + std::chrono::microseconds rtt) { + return PacingRate::Builder() + .setInterval(rtt) + .setBurstSize(cwndBytes / conn.udpSendPacketLen) + .build(); }); - - // These input doesn't matter, the rate calculator above returns fixed values. - pacer.refreshPacingRate(100, 100ms); - - EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); - EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now())); - - // Consume all tokens: - consumeTokensHelper(pacer, 10); - - EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite()); - EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now())); - - // Do a schedule: - auto curTime = Clock::now(); - pacer.onPacedWriteScheduled(curTime); - // 10ms later you should have 10 mss credit: - EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms)); - - // Schedule again from this point: - pacer.onPacedWriteScheduled(curTime + 10ms); - // Then elapse another 10ms, and previous tokens hasn't been used: - EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms)); + pacer.refreshPacingRate(2000 * conn.udpSendPacketLen, 1us); + pacer.onPacketSent(1); + EXPECT_TRUE(pacer.isPacingLimited()); } - } // namespace test } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 3109dc118..7381f5ed1 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -164,14 +164,6 @@ struct Pacer { uint64_t cwndBytes, std::chrono::microseconds rtt) = 0; - /** - * Notify the Pacer that a paced write is scheduled. - * - * currentTime: the time that the timer is scheduled. NOT the time that a - * write is scheduled to happen. - */ - virtual void onPacedWriteScheduled(TimePoint currentTime) = 0; - /** * API for Trnasport to query the interval before next write */ @@ -193,7 +185,7 @@ struct Pacer { virtual uint64_t getCachedWriteBatchSize() const = 0; virtual void setAppLimited(bool limited) = 0; - virtual void onPacketSent() = 0; + virtual void onPacketSent(uint64_t sentBytes) = 0; virtual void onPacketsLoss() = 0; }; @@ -201,6 +193,7 @@ struct PacingRate { std::chrono::microseconds interval{0us}; uint64_t burstSize{0}; + PacingRate() = default; struct Builder { Builder&& setInterval(std::chrono::microseconds interval) &&; Builder&& setBurstSize(uint64_t burstSize) &&; diff --git a/quic/state/test/Mocks.h b/quic/state/test/Mocks.h index eca212d87..c5b7488fa 100644 --- a/quic/state/test/Mocks.h +++ b/quic/state/test/Mocks.h @@ -34,12 +34,11 @@ class MockCongestionController : public CongestionController { class MockPacer : public Pacer { public: MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds)); - MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint)); MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds()); MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint)); MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t()); MOCK_METHOD1(setAppLimited, void(bool)); - MOCK_METHOD0(onPacketSent, void()); + MOCK_METHOD1(onPacketSent, void(uint64_t)); MOCK_METHOD0(onPacketsLoss, void()); };