diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 07f39216e..c675c4767 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -49,6 +49,7 @@ QuicTransportBase::QuicTransportBase( LooperType::WriteLooper)) { writeLooper_->setPacingFunction([this]() -> auto { if (isConnectionPaced(*conn_)) { + conn_->pacer->onPacedWriteScheduled(Clock::now()); return conn_->pacer->getTimeUntilNextWrite(); } return 0us; diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 0d7ae7e20..fec9412ab 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -402,7 +402,7 @@ void updateConnection( } } if (conn.pacer) { - conn.pacer->onPacketSent(pkt.encodedSize); + conn.pacer->onPacketSent(); } if (conn.pathValidationLimiter && (conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) { diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index 1cc0c3081..a8e37f6ce 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -765,7 +765,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) { ackFrame.ackBlocks.emplace_back(0, 10); packet.packet.frames.push_back(std::move(ackFrame)); EXPECT_CALL(*rawController, onPacketSent(_)).Times(0); - EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(0); + EXPECT_CALL(*rawPacer, onPacketSent()).Times(0); updateConnection( *conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet)); EXPECT_EQ(0, conn->outstandingPackets.size()); @@ -1020,7 +1020,7 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicdataToSocketWithPacer) { IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012"); writeDataToQuicStream(*stream1, buf->clone(), true); - EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(1); + EXPECT_CALL(*rawPacer, onPacketSent()).Times(1); EXPECT_CALL(*transportInfoCb_, onWrite(_)); writeQuicDataToSocket( *rawSocket, diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 3328845a3..63a6b337c 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -2511,6 +2511,7 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) { EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) .WillRepeatedly(Return(1)); + EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_)); EXPECT_CALL(*rawPacer, getTimeUntilNextWrite()) .WillRepeatedly(Return(3600000ms)); // This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will diff --git a/quic/congestion_control/CongestionControlFunctions.cpp b/quic/congestion_control/CongestionControlFunctions.cpp index 465d6f5a4..b8ecf5f83 100644 --- a/quic/congestion_control/CongestionControlFunctions.cpp +++ b/quic/congestion_control/CongestionControlFunctions.cpp @@ -29,21 +29,29 @@ PacingRate calculatePacingRate( uint64_t cwnd, uint64_t minCwndInMss, std::chrono::microseconds rtt) { - if (conn.transportSettings.pacingTimerTickInterval >= rtt) { + if (conn.transportSettings.pacingTimerTickInterval > rtt) { // We cannot really pace in this case. return PacingRate::Builder() .setInterval(0us) .setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit) .build(); } - uint64_t numIntervals = rtt / conn.transportSettings.pacingTimerTickInterval; uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen); + // Each interval we want to send cwndInpackets / (rtt / minimalInverval) + // number of packets. uint64_t burstPerInterval = std::max( - conn.transportSettings.minBurstPackets, cwndInPackets / numIntervals); + conn.transportSettings.minBurstPackets, + static_cast(std::ceil( + static_cast(cwndInPackets) * + static_cast( + conn.transportSettings.pacingTimerTickInterval.count()) / + static_cast(rtt.count())))); + auto interval = timeMax( + conn.transportSettings.pacingTimerTickInterval, + rtt * burstPerInterval / cwndInPackets); return PacingRate::Builder() - .setInterval(conn.transportSettings.pacingTimerTickInterval) - .setBurstSize(burstPerInterval) - .build(); + .setInterval(interval) + .setBurstSize(burstPerInterval) + .build(); } - } // namespace quic diff --git a/quic/congestion_control/Pacer.cpp b/quic/congestion_control/Pacer.cpp index 463c33cba..eb196bf89 100644 --- a/quic/congestion_control/Pacer.cpp +++ b/quic/congestion_control/Pacer.cpp @@ -8,7 +8,6 @@ #include -#include #include #include @@ -22,9 +21,7 @@ DefaultPacer::DefaultPacer( batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), pacingRateCalculator_(calculatePacingRate), cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), - tokens_(conn.transportSettings.writeConnectionDataPacketsLimit), - nextWriteTime_(Clock::now()), - lastWriteTime_(Clock::now()) {} + tokens_(conn.transportSettings.writeConnectionDataPacketsLimit) {} // TODO: we choose to keep refershing pacing rate even when we are app-limited, // so that when we exit app-limited, we have an updated pacing rate. But I don't @@ -32,17 +29,15 @@ DefaultPacer::DefaultPacer( void DefaultPacer::refreshPacingRate( uint64_t cwndBytes, std::chrono::microseconds rtt) { - auto currentTime = Clock::now(); if (rtt < conn_.transportSettings.pacingTimerTickInterval) { writeInterval_ = 0us; batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; } else { - const auto pacingRate = + const PacingRate pacingRate = pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt); writeInterval_ = pacingRate.interval; batchSize_ = pacingRate.burstSize; - lastPacingRateUpdate_ = currentTime; - bytesSentSincePacingRateUpdate_ = 0; + tokens_ += batchSize_; } if (conn_.qLogger) { conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_); @@ -50,40 +45,16 @@ void DefaultPacer::refreshPacingRate( QUIC_TRACE( pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_); cachedBatchSize_ = batchSize_; - tokens_ = batchSize_; - nextWriteTime_ = currentTime; - if (firstUpdate_) { - firstUpdate_ = false; - lastWriteTime_ = currentTime; - } } -void DefaultPacer::onPacketSent(uint64_t bytesSent) { +void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) { + scheduledWriteTime_ = currentTime; +} + +void DefaultPacer::onPacketSent() { if (tokens_) { --tokens_; } - bytesSentSincePacingRateUpdate_ += bytesSent; - if (writeInterval_ != 0us && cachedBatchSize_ && !appLimited_ && - lastPacingRateUpdate_) { - Bandwidth expectedBandwidth( - cachedBatchSize_ * conn_.udpSendPacketLen, writeInterval_); - if (expectedBandwidth) { - Bandwidth actualPacingBandwidth( - bytesSentSincePacingRateUpdate_, - std::chrono::duration_cast( - Clock::now() - *lastPacingRateUpdate_)); - pacingLimited_ = actualPacingBandwidth < expectedBandwidth; - } - } else { - pacingLimited_ = false; - } - if (!pacingLimited_) { - lastWriteTime_ = Clock::now(); - } -} - -bool DefaultPacer::isPacingLimited() const noexcept { - return pacingLimited_; } void DefaultPacer::onPacketsLoss() { @@ -91,30 +62,34 @@ void DefaultPacer::onPacketsLoss() { } std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const { - return (writeInterval_ == 0us || appLimited_ || tokens_ || - Clock::now() + conn_.transportSettings.pacingTimerTickInterval >= - nextWriteTime_) - ? 0us - : timeMax( - conn_.transportSettings.pacingTimerTickInterval, - timeMin( - writeInterval_, - std::chrono::duration_cast( - nextWriteTime_ - Clock::now()))); + return (appLimited_ || tokens_) ? 0us : writeInterval_; } uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) { SCOPE_EXIT { - lastWriteTime_ = nextWriteTime_; - nextWriteTime_ += writeInterval_; + scheduledWriteTime_.clear(); }; - if (appLimited_ || writeInterval_ == 0us) { - return conn_.transportSettings.writeConnectionDataPacketsLimit; + if (appLimited_) { + cachedBatchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; + return cachedBatchSize_; + } + if (writeInterval_ == 0us) { + return batchSize_; + } + if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) { + return tokens_; } auto adjustedInterval = std::chrono::duration_cast( - timeMax(currentTime - lastWriteTime_, writeInterval_)); - return std::ceil( + currentTime - *scheduledWriteTime_ + writeInterval_); + cachedBatchSize_ = std::ceil( adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count()); + if (cachedBatchSize_ < batchSize_) { + LOG(ERROR) + << "Quic pacer batch size calculation: cachedBatchSize < batchSize"; + } + tokens_ += + (cachedBatchSize_ > batchSize_ ? cachedBatchSize_ - batchSize_ : 0); + return tokens_; } uint64_t DefaultPacer::getCachedWriteBatchSize() const { diff --git a/quic/congestion_control/Pacer.h b/quic/congestion_control/Pacer.h index 2b7d312ce..c4bb9f24b 100644 --- a/quic/congestion_control/Pacer.h +++ b/quic/congestion_control/Pacer.h @@ -8,7 +8,6 @@ #pragma once -#include #include namespace quic { @@ -32,6 +31,8 @@ class DefaultPacer : public Pacer { void refreshPacingRate(uint64_t cwndBytes, std::chrono::microseconds rtt) override; + void onPacedWriteScheduled(TimePoint currentTime) override; + std::chrono::microseconds getTimeUntilNextWrite() const override; uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override; @@ -42,12 +43,9 @@ class DefaultPacer : public Pacer { void setAppLimited(bool limited) override; - void onPacketSent(uint64_t bytesSent) override; + void onPacketSent() override; void onPacketsLoss() override; - // Only used for test: - bool isPacingLimited() const noexcept; - private: const QuicConnectionStateBase& conn_; uint64_t minCwndInMss_; @@ -58,11 +56,5 @@ class DefaultPacer : public Pacer { uint64_t cachedBatchSize_; bool appLimited_{false}; uint64_t tokens_; - uint64_t bytesSentSincePacingRateUpdate_{0}; - folly::Optional lastPacingRateUpdate_; - bool pacingLimited_{false}; - TimePoint nextWriteTime_; - TimePoint lastWriteTime_; - bool firstUpdate_{true}; }; } // namespace quic diff --git a/quic/congestion_control/test/CongestionControlFunctionsTest.cpp b/quic/congestion_control/test/CongestionControlFunctionsTest.cpp index 2fb836c9e..d708519d0 100644 --- a/quic/congestion_control/test/CongestionControlFunctionsTest.cpp +++ b/quic/congestion_control/test/CongestionControlFunctionsTest.cpp @@ -43,7 +43,9 @@ TEST_F(CongestionControlFunctionsTest, MinPacingRate) { conn.transportSettings.pacingTimerTickInterval = 1ms; auto result = calculatePacingRate( conn, 100, conn.transportSettings.minCwndInMss, 100ms); - EXPECT_EQ(1ms, result.interval); + // 100 ms rtt, 1ms tick interval, 100 mss cwnd, 5 mss min burst -> 5 mss every + // 5ms + EXPECT_EQ(5ms, result.interval); EXPECT_EQ(conn.transportSettings.minBurstPackets, result.burstSize); } @@ -54,7 +56,7 @@ TEST_F(CongestionControlFunctionsTest, SmallCwnd) { conn.transportSettings.pacingTimerTickInterval = 1ms; auto result = calculatePacingRate( conn, 10, conn.transportSettings.minCwndInMss, 100000us); - EXPECT_EQ(1ms, result.interval); + EXPECT_EQ(10ms, result.interval); EXPECT_EQ(1, result.burstSize); } @@ -70,5 +72,6 @@ TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) { conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize); } + } // namespace test } // namespace quic diff --git a/quic/congestion_control/test/PacerTest.cpp b/quic/congestion_control/test/PacerTest.cpp index 1cba3b32a..c7db460cc 100644 --- a/quic/congestion_control/test/PacerTest.cpp +++ b/quic/congestion_control/test/PacerTest.cpp @@ -17,7 +17,7 @@ namespace test { namespace { void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) { for (size_t i = 0; i < tokensToConsume; i++) { - pacer.onPacketSent(1000); + pacer.onPacketSent(); } } } // namespace @@ -45,19 +45,16 @@ TEST_F(PacerTest, RateCalculator) { uint64_t, uint64_t, std::chrono::microseconds) { - return PacingRate::Builder().setInterval(500ms).setBurstSize(4321).build(); + return PacingRate::Builder().setInterval(1234us).setBurstSize(4321).build(); }); - auto currentTime = Clock::now(); pacer.refreshPacingRate(200000, 200us); EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); - EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(currentTime)); - consumeTokensHelper(pacer, 4321); - EXPECT_NEAR( - std::chrono::duration_cast( - 500ms + currentTime - Clock::now()) - .count(), - pacer.getTimeUntilNextWrite().count(), - 2000); + EXPECT_EQ( + 4321 + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(Clock::now())); + consumeTokensHelper( + pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit); + EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite()); } TEST_F(PacerTest, CompensateTimerDrift) { @@ -69,18 +66,23 @@ TEST_F(PacerTest, CompensateTimerDrift) { }); auto currentTime = Clock::now(); pacer.refreshPacingRate(20, 100us); // These two values do not matter here - // After refresh, both last and next write time is very close to currentTime - EXPECT_NEAR(10, pacer.updateAndGetWriteBatchSize(currentTime + 1000us), 2); - // lastWriteTime ~= currentTime, nextWriteTime ~= currentTime + 1000us + pacer.onPacedWriteScheduled(currentTime); + EXPECT_EQ( + 20 + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(currentTime + 1000us)); - EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2); - // lastWriteTime ~= currentTime + 1000us, nextWriteTime ~= currentTime + - // 2000us + // Query batch size again without calling onPacedWriteScheduled won't do timer + // drift compensation. But token_ keeps the last compenstation. + EXPECT_EQ( + 20 + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); // Consume a few: consumeTokensHelper(pacer, 3); - EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2); + EXPECT_EQ( + 20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3, + pacer.updateAndGetWriteBatchSize(currentTime + 2000us)); } TEST_F(PacerTest, NextWriteTime) { @@ -92,18 +94,17 @@ TEST_F(PacerTest, NextWriteTime) { std::chrono::microseconds rtt) { return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build(); }); - auto currentTime = Clock::now(); - pacer.refreshPacingRate(20, 100ms); + pacer.refreshPacingRate(20, 1000us); // Right after refresh, it's always 0us. You can always send right after an // ack. EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); - pacer.updateAndGetWriteBatchSize(currentTime); // Consume all the tokens: - consumeTokensHelper(pacer, 10); + consumeTokensHelper( + pacer, 10 + conn.transportSettings.writeConnectionDataPacketsLimit); // Then we use real delay: - EXPECT_NEAR(100 * 1000, pacer.getTimeUntilNextWrite().count(), 1000); + EXPECT_EQ(1000us, pacer.getTimeUntilNextWrite()); } TEST_F(PacerTest, ImpossibleToPace) { @@ -137,16 +138,17 @@ TEST_F(PacerTest, CachedBatchSize) { .setBurstSize(cwndBytes / conn.udpSendPacketLen * 2) .build(); }); - auto currentTime = Clock::now(); pacer.refreshPacingRate(20 * conn.udpSendPacketLen, 100ms); EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); + auto currentTime = Clock::now(); + pacer.onPacedWriteScheduled(currentTime); pacer.updateAndGetWriteBatchSize(currentTime); - // lastWriteTime ~= currentTime, nextWriteTime_ ~= currentTime + 100ms EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); - EXPECT_EQ(80, pacer.updateAndGetWriteBatchSize(currentTime + 200ms)); - EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); + pacer.onPacedWriteScheduled(currentTime + 100ms); + pacer.updateAndGetWriteBatchSize(currentTime + 200ms); + EXPECT_EQ(80, pacer.getCachedWriteBatchSize()); } TEST_F(PacerTest, AppLimited) { @@ -156,19 +158,48 @@ TEST_F(PacerTest, AppLimited) { EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now())); } -TEST_F(PacerTest, PacingLimited) { - pacer.setPacingRateCalculator([](const QuicConnectionStateBase& conn, - uint64_t cwndBytes, +TEST_F(PacerTest, Tokens) { + // Pacer has tokens right after init: + EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + EXPECT_EQ( + conn.transportSettings.writeConnectionDataPacketsLimit, + pacer.updateAndGetWriteBatchSize(Clock::now())); + + // Consume all initial tokens: + consumeTokensHelper( + pacer, conn.transportSettings.writeConnectionDataPacketsLimit); + + // Pacing rate: 10 mss per 10 ms + pacer.setPacingRateCalculator([](const QuicConnectionStateBase&, uint64_t, - std::chrono::microseconds rtt) { - return PacingRate::Builder() - .setInterval(rtt) - .setBurstSize(cwndBytes / conn.udpSendPacketLen) - .build(); + uint64_t, + std::chrono::microseconds) { + return PacingRate::Builder().setInterval(10ms).setBurstSize(10).build(); }); - pacer.refreshPacingRate(2000 * conn.udpSendPacketLen, 1us); - pacer.onPacketSent(1); - EXPECT_TRUE(pacer.isPacingLimited()); + + // These input doesn't matter, the rate calculator above returns fixed values. + pacer.refreshPacingRate(100, 100ms); + + EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); + EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now())); + + // Consume all tokens: + consumeTokensHelper(pacer, 10); + + EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite()); + EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now())); + + // Do a schedule: + auto curTime = Clock::now(); + pacer.onPacedWriteScheduled(curTime); + // 10ms later you should have 10 mss credit: + EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms)); + + // Schedule again from this point: + pacer.onPacedWriteScheduled(curTime + 10ms); + // Then elapse another 10ms, and previous tokens hasn't been used: + EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms)); } + } // namespace test } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index b1e71e937..7eb435e04 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -179,6 +179,14 @@ struct Pacer { uint64_t cwndBytes, std::chrono::microseconds rtt) = 0; + /** + * Notify the Pacer that a paced write is scheduled. + * + * currentTime: the time that the timer is scheduled. NOT the time that a + * write is scheduled to happen. + */ + virtual void onPacedWriteScheduled(TimePoint currentTime) = 0; + /** * API for Trnasport to query the interval before next write */ @@ -200,7 +208,7 @@ struct Pacer { virtual uint64_t getCachedWriteBatchSize() const = 0; virtual void setAppLimited(bool limited) = 0; - virtual void onPacketSent(uint64_t sentBytes) = 0; + virtual void onPacketSent() = 0; virtual void onPacketsLoss() = 0; }; @@ -208,7 +216,6 @@ struct PacingRate { std::chrono::microseconds interval{0us}; uint64_t burstSize{0}; - PacingRate() = default; struct Builder { Builder&& setInterval(std::chrono::microseconds interval) &&; Builder&& setBurstSize(uint64_t burstSize) &&; diff --git a/quic/state/test/Mocks.h b/quic/state/test/Mocks.h index c5b7488fa..eca212d87 100644 --- a/quic/state/test/Mocks.h +++ b/quic/state/test/Mocks.h @@ -34,11 +34,12 @@ class MockCongestionController : public CongestionController { class MockPacer : public Pacer { public: MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds)); + MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint)); MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds()); MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint)); MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t()); MOCK_METHOD1(setAppLimited, void(bool)); - MOCK_METHOD1(onPacketSent, void(uint64_t)); + MOCK_METHOD0(onPacketSent, void()); MOCK_METHOD0(onPacketsLoss, void()); };