diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index a53582d95..8c75410e4 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -240,6 +240,9 @@ constexpr size_t kBlockedSizeBytes = 20; constexpr uint64_t kInitCwndInMss = 10; constexpr uint64_t kMinCwndInMss = 2; +// Min cwnd for BBR is 4 MSS regard less of transport settings +constexpr uint64_t kMinCwndInMssForBbr{4}; + constexpr uint64_t kDefaultMaxCwndInMss = 2000; // When server receives early data attempt without valid source address token, // server will limit bytes in flight to avoid amplification attack until CFIN diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 325def66c..0cc5eb032 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -47,8 +48,8 @@ QuicTransportBase::QuicTransportBase( LooperType::WriteLooper)) { writeLooper_->setPacingFunction([this]() -> auto { if (isConnectionPaced(*conn_)) { - conn_->congestionController->markPacerTimeoutScheduled(Clock::now()); - return conn_->congestionController->getPacingInterval(); + conn_->pacer->onPacedWriteScheduled(Clock::now()); + return conn_->pacer->getTimeUntilNextWrite(); } return 0us; }); @@ -479,13 +480,11 @@ QuicSocket::TransportInfo QuicTransportBase::getTransportInfo() const { if (conn_->congestionController) { writableBytes = conn_->congestionController->getWritableBytes(); congestionWindow = conn_->congestionController->getCongestionWindow(); - // Do not collect pacing stats for Cubic, since getPacingRate() call - // modifies some internal state. TODO(yangchi): Remove this check after - // changing Cubic implementation. if (conn_->congestionController->type() != CongestionControlType::Cubic && isConnectionPaced(*conn_)) { - burstSize = conn_->congestionController->getPacingRate(Clock::now()); - pacingInterval = conn_->congestionController->getPacingInterval(); + // TODO: This is bad, we need an API that get without update pacing rate + burstSize = conn_->pacer->updateAndGetWriteBatchSize(Clock::now()); + pacingInterval = conn_->pacer->getTimeUntilNextWrite(); } } TransportInfo transportInfo; @@ -2290,6 +2289,14 @@ void QuicTransportBase::setTransportSettings( TransportSettings transportSettings) { conn_->transportSettings = std::move(transportSettings); setCongestionControl(transportSettings.defaultCongestionController); + if (conn_->transportSettings.pacingEnabled) { + conn_->pacer = std::make_unique( + *conn_, + transportSettings.defaultCongestionController == + CongestionControlType::BBR + ? kMinCwndInMssForBbr + : conn_->transportSettings.minCwndInMss); + } } const TransportSettings& QuicTransportBase::getTransportSettings() const { diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 3c700c57e..66e4fdd0b 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -92,7 +92,7 @@ class TestQuicTransport *headerCipher, getVersion(), (isConnectionPaced(*conn_) - ? conn_->congestionController->getPacingRate(Clock::now()) + ? conn_->pacer->updateAndGetWriteBatchSize(Clock::now()) : conn_->transportSettings.writeConnectionDataPacketsLimit)); } @@ -2466,6 +2466,9 @@ TEST_F(QuicTransportTest, PacingWillBurstFirst) { conn.congestionController = std::move(mockCongestionController); conn.transportSettings.pacingEnabled = true; conn.canBePaced = true; + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); EXPECT_CALL(*rawCongestionController, getWritableBytes()) .WillRepeatedly(Return(100)); @@ -2473,7 +2476,7 @@ TEST_F(QuicTransportTest, PacingWillBurstFirst) { auto streamId = transport_->createBidirectionalStream().value(); transport_->writeChain(streamId, buf->clone(), false, false); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); - EXPECT_CALL(*rawCongestionController, getPacingRate(_)) + EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) .WillRepeatedly(Return(1)); transport_->pacedWrite(true); } @@ -2486,6 +2489,9 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) { conn.congestionController = std::move(mockCongestionController); conn.transportSettings.pacingEnabled = true; conn.canBePaced = true; + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); EXPECT_CALL(*rawCongestionController, getWritableBytes()) .WillRepeatedly(Return(100)); @@ -2493,10 +2499,10 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) { auto streamId = transport_->createBidirectionalStream().value(); transport_->writeChain(streamId, buf->clone(), false, false); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); - EXPECT_CALL(*rawCongestionController, getPacingRate(_)) + EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) .WillRepeatedly(Return(1)); - EXPECT_CALL(*rawCongestionController, markPacerTimeoutScheduled(_)); - EXPECT_CALL(*rawCongestionController, getPacingInterval()) + EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_)); + EXPECT_CALL(*rawPacer, getTimeUntilNextWrite()) .WillRepeatedly(Return(3600000ms)); // This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will // schedule a pacing timeout. @@ -2515,6 +2521,9 @@ TEST_F(QuicTransportTest, NoScheduleIfNoNewData) { conn.congestionController = std::move(mockCongestionController); conn.transportSettings.pacingEnabled = true; conn.canBePaced = true; + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); EXPECT_CALL(*rawCongestionController, getWritableBytes()) .WillRepeatedly(Return(1000)); @@ -2522,7 +2531,7 @@ TEST_F(QuicTransportTest, NoScheduleIfNoNewData) { auto streamId = transport_->createBidirectionalStream().value(); transport_->writeChain(streamId, buf->clone(), false, false); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); - EXPECT_CALL(*rawCongestionController, getPacingRate(_)) + EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) .WillRepeatedly(Return(1)); // This will write out everything. After that because there is no new data, // FunctionLooper won't schedule a pacing timeout. diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index 8328a488f..072e04780 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -723,7 +723,7 @@ void QuicClientTransport::writeData() { uint64_t packetLimit = (isConnectionPaced(*conn_) - ? conn_->congestionController->getPacingRate(Clock::now()) + ? conn_->pacer->updateAndGetWriteBatchSize(Clock::now()) : conn_->transportSettings.writeConnectionDataPacketsLimit); CryptoStreamScheduler initialScheduler( *conn_, *getCryptoStream(*conn_->cryptoState, EncryptionLevel::Initial)); diff --git a/quic/congestion_control/Bbr.cpp b/quic/congestion_control/Bbr.cpp index bceb1651f..4e199f499 100644 --- a/quic/congestion_control/Bbr.cpp +++ b/quic/congestion_control/Bbr.cpp @@ -251,17 +251,15 @@ void BbrCongestionController::onPacketAcked( // canBePaced function. Now this function is gone, maybe we need to change this // updatePacing function. void BbrCongestionController::updatePacing() noexcept { + // TODO: enable Pacing and BBR together. + if (!conn_.pacer) { + return; + } auto bandwidthEstimate = bandwidth(); if (!bandwidthEstimate) { return; } auto mrtt = minRtt(); - if (mrtt == 0us || mrtt < conn_.transportSettings.pacingTimerTickInterval) { - return; - } - // TODO(t40615081, yangchi) cloning Handshake packets make this better - VLOG_IF(10, conn_.lossState.srtt != 0us) - << "no reliable srtt sample, " << *this; uint64_t targetPacingWindow = bandwidthEstimate * pacingGain_ * mrtt; if (btlbwFound_) { pacingWindow_ = targetPacingWindow; @@ -276,12 +274,7 @@ void BbrCongestionController::updatePacing() noexcept { pacingWindow_ = std::max(pacingWindow_, targetPacingWindow); } // TODO: slower pacing if we are in STARTUP and loss has happened - std::tie(pacingInterval_, pacingBurstSize_) = - calculatePacingRate(conn_, pacingWindow_, kMinCwndInMssForBbr, mrtt); - - if (conn_.transportSettings.pacingEnabled && conn_.qLogger) { - conn_.qLogger->addPacingMetricUpdate(pacingBurstSize_, pacingInterval_); - } + conn_.pacer->refreshPacingRate(pacingWindow_, mrtt); } void BbrCongestionController::handleAckInProbeBw( @@ -582,20 +575,6 @@ void BbrCongestionController::onRemoveBytesFromInflight( subtractAndCheckUnderflow(inflightBytes_, bytesToRemove); } -uint64_t BbrCongestionController::getPacingRate( - TimePoint /* currentTime */) noexcept { - return pacingBurstSize_; -} - -std::chrono::microseconds BbrCongestionController::getPacingInterval() const - noexcept { - return pacingInterval_; -} - -void BbrCongestionController::markPacerTimeoutScheduled(TimePoint) noexcept { - /* This API is going away */ -} - std::string bbrStateToString(BbrCongestionController::BbrState state) { switch (state) { case BbrCongestionController::BbrState::Startup: diff --git a/quic/congestion_control/Bbr.h b/quic/congestion_control/Bbr.h index a8bf80630..4d74259f2 100644 --- a/quic/congestion_control/Bbr.h +++ b/quic/congestion_control/Bbr.h @@ -40,8 +40,6 @@ constexpr uint64_t kBandwidthWindowLength = kNumOfCycles + 2; constexpr std::chrono::seconds kDefaultRttSamplerExpiration{10}; // See calculateReductionFactors in QuicCubic.cpp constexpr double kBbrReductionFactor = 0.9; -// Min cwnd for BBR is 4 MSS regard less of transport settings -constexpr uint64_t kMinCwndInMssForBbr{4}; struct Bandwidth { uint64_t bytes; @@ -200,10 +198,6 @@ class BbrCongestionController : public CongestionController { bool isAppLimited() const noexcept override; - uint64_t getPacingRate(TimePoint currentTime) noexcept override; - std::chrono::microseconds getPacingInterval() const noexcept override; - void markPacerTimeoutScheduled(TimePoint) noexcept override; - // TODO: some of these do not have to be in public API. bool inRecovery() const noexcept; BbrState state() const noexcept; @@ -216,6 +210,7 @@ class BbrCongestionController : public CongestionController { void onPacketAcked(const AckEvent& ack, uint64_t prevInflightBytes, bool hasLoss); void onPacketLoss(const LossEvent&); + void updatePacing() noexcept; /** * Update the ack aggregation states @@ -264,7 +259,6 @@ class BbrCongestionController : public CongestionController { uint64_t calculateTargetCwnd(float gain) const noexcept; void updateCwnd(uint64_t ackedBytes, uint64_t excessiveBytes) noexcept; - void updatePacing() noexcept; std::chrono::microseconds minRtt() const noexcept; Bandwidth bandwidth() const noexcept; @@ -291,8 +285,6 @@ class BbrCongestionController : public CongestionController { uint64_t inflightBytes_{0}; // Number of bytes we expect to send over on RTT when paced write. uint64_t pacingWindow_{0}; - uint64_t pacingBurstSize_{0}; - std::chrono::microseconds pacingInterval_{0us}; float cwndGain_{kStartupGain}; float pacingGain_{kStartupGain}; diff --git a/quic/congestion_control/CongestionControlFunctions.cpp b/quic/congestion_control/CongestionControlFunctions.cpp index d4eb8fcfd..e9c4dff86 100644 --- a/quic/congestion_control/CongestionControlFunctions.cpp +++ b/quic/congestion_control/CongestionControlFunctions.cpp @@ -24,27 +24,17 @@ uint64_t boundedCwnd( minCwndInMss * packetLength); } -PacingRate calculatePacingRateWrapper( - const QuicConnectionStateBase& conn, - uint64_t cwnd, - uint64_t minCwndInMss, - std::chrono::microseconds rtt) { - auto result = calculatePacingRate(conn, cwnd, minCwndInMss, rtt); - return PacingRate::Builder() - .setInterval(result.first) - .setBurstSize(result.second) - .build(); -} - -std::pair calculatePacingRate( +PacingRate calculatePacingRate( const QuicConnectionStateBase& conn, uint64_t cwnd, uint64_t minCwndInMss, std::chrono::microseconds rtt) { if (conn.transportSettings.pacingTimerTickInterval > rtt) { // We cannot really pace in this case. - return std::make_pair( - 0us, conn.transportSettings.writeConnectionDataPacketsLimit); + return PacingRate::Builder() + .setInterval(0us) + .setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit) + .build(); } uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen); // Each interval we want to send cwndInpackets / (rtt / minimalInverval) @@ -59,6 +49,9 @@ std::pair calculatePacingRate( auto interval = timeMax( conn.transportSettings.pacingTimerTickInterval, rtt * burstPerInterval / cwndInPackets); - return std::make_pair(interval, burstPerInterval); + return PacingRate::Builder() + .setInterval(interval) + .setBurstSize(burstPerInterval) + .build(); } } // namespace quic diff --git a/quic/congestion_control/CongestionControlFunctions.h b/quic/congestion_control/CongestionControlFunctions.h index 214ff3439..68d9c620e 100644 --- a/quic/congestion_control/CongestionControlFunctions.h +++ b/quic/congestion_control/CongestionControlFunctions.h @@ -21,15 +21,7 @@ uint64_t boundedCwnd( uint64_t maxCwndInMss, uint64_t minCwndInMss) noexcept; -std::pair calculatePacingRate( - const QuicConnectionStateBase& conn, - uint64_t cwnd, - uint64_t minCwndInMss, - std::chrono::microseconds rtt); - -// TODO: Temporary wrapper to make the transition from CC doing pacing to -// the real pacer easier. Need to remove it after real pacer is used everywhere. -PacingRate calculatePacingRateWrapper( +PacingRate calculatePacingRate( const QuicConnectionStateBase& conn, uint64_t cwnd, uint64_t minCwndInMss, diff --git a/quic/congestion_control/Copa.cpp b/quic/congestion_control/Copa.cpp index 73f66824a..a0abbfd55 100644 --- a/quic/congestion_control/Copa.cpp +++ b/quic/congestion_control/Copa.cpp @@ -262,7 +262,9 @@ void Copa::onPacketAcked(const AckEvent& ack) { cwndBytes_ - conn_.transportSettings.minCwndInMss * conn_.udpSendPacketLen)); } - updatePacing(); + if (conn_.pacer) { + conn_.pacer->refreshPacingRate(cwndBytes_ * 2, conn_.lossState.srtt); + } } void Copa::onPacketLoss(const LossEvent& loss) { @@ -285,7 +287,9 @@ void Copa::onPacketLoss(const LossEvent& loss) { bytesInFlight_, getCongestionWindow(), kPersistentCongestion); } cwndBytes_ = conn_.transportSettings.minCwndInMss * conn_.udpSendPacketLen; - updatePacing(); + if (conn_.pacer) { + conn_.pacer->refreshPacingRate(cwndBytes_ * 2, conn_.lossState.srtt); + } } } @@ -311,38 +315,10 @@ CongestionControlType Copa::type() const noexcept { void Copa::setConnectionEmulation(uint8_t) noexcept {} -void Copa::updatePacing() noexcept { - std::tie(pacingInterval_, pacingBurstSize_) = calculatePacingRate( - conn_, - cwndBytes_ * 2, - conn_.transportSettings.minCwndInMss, - conn_.lossState.srtt); - if (pacingInterval_ == std::chrono::milliseconds::zero()) { - return; - } - if (conn_.transportSettings.pacingEnabled) { - VLOG(10) << "updatePacing pacingInterval_ = " << pacingInterval_.count() - << ", pacingBurstSize_ " << pacingBurstSize_ << " " << conn_; - if (conn_.qLogger) { - conn_.qLogger->addPacingMetricUpdate(pacingBurstSize_, pacingInterval_); - } - } -} - uint64_t Copa::getBytesInFlight() const noexcept { return bytesInFlight_; } -uint64_t Copa::getPacingRate(TimePoint /* currentTime */) noexcept { - return pacingBurstSize_; -} - -void Copa::markPacerTimeoutScheduled(TimePoint /* currentTime*/) noexcept {} - -std::chrono::microseconds Copa::getPacingInterval() const noexcept { - return pacingInterval_; -} - void Copa::setAppIdle(bool, TimePoint) noexcept { /* unsupported */ } diff --git a/quic/congestion_control/Copa.h b/quic/congestion_control/Copa.h index 96eaab5f8..2bee46a5e 100644 --- a/quic/congestion_control/Copa.h +++ b/quic/congestion_control/Copa.h @@ -45,19 +45,11 @@ class Copa : public CongestionController { void setConnectionEmulation(uint8_t) noexcept override; void setAppIdle(bool, TimePoint) noexcept override; void setAppLimited() override; - - uint64_t getPacingRate(TimePoint currentTime) noexcept override; - - void markPacerTimeoutScheduled(TimePoint currentTime) noexcept override; - - std::chrono::microseconds getPacingInterval() const noexcept override; - bool isAppLimited() const noexcept override; private: void onPacketAcked(const AckEvent&); void onPacketLoss(const LossEvent&); - void updatePacing() noexcept; struct VelocityState { uint64_t velocity{1}; @@ -106,8 +98,5 @@ class Copa : public CongestionController { * it will minimize delay at expense of throughput. */ double latencyFactor_{0.50}; - - uint64_t pacingBurstSize_{0}; - std::chrono::microseconds pacingInterval_{0us}; }; } // namespace quic diff --git a/quic/congestion_control/NewReno.cpp b/quic/congestion_control/NewReno.cpp index 736889922..e192e9e0f 100644 --- a/quic/congestion_control/NewReno.cpp +++ b/quic/congestion_control/NewReno.cpp @@ -96,6 +96,7 @@ void NewReno::onPacketAckOrLoss( if (ackEvent && ackEvent->largestAckedPacket.hasValue()) { onAckEvent(*ackEvent); } + // TODO: Pacing isn't supported with NewReno } void NewReno::onPacketLoss(const LossEvent& loss) { @@ -165,21 +166,6 @@ uint64_t NewReno::getBytesInFlight() const noexcept { return bytesInFlight_; } -uint64_t NewReno::getPacingRate(TimePoint /* currentTime */) noexcept { - // Pacing is not supported on NewReno currently - return conn_.transportSettings.writeConnectionDataPacketsLimit; -} - -void NewReno::markPacerTimeoutScheduled(TimePoint /* currentTime */) noexcept { - // Pacing is not supported on NewReno currently -} - -std::chrono::microseconds NewReno::getPacingInterval() const noexcept { - // Pacing is not supported on NewReno currently - return std::chrono::microseconds( - folly::HHWheelTimerHighRes::DEFAULT_TICK_INTERVAL); -} - void NewReno::setAppIdle(bool, TimePoint) noexcept { /* unsupported */ } diff --git a/quic/congestion_control/NewReno.h b/quic/congestion_control/NewReno.h index 5fe457539..6718b9807 100644 --- a/quic/congestion_control/NewReno.h +++ b/quic/congestion_control/NewReno.h @@ -35,12 +35,6 @@ class NewReno : public CongestionController { uint64_t getBytesInFlight() const noexcept; - uint64_t getPacingRate(TimePoint currentTime) noexcept override; - - void markPacerTimeoutScheduled(TimePoint currentTime) noexcept override; - - std::chrono::microseconds getPacingInterval() const noexcept override; - bool isAppLimited() const noexcept override; private: diff --git a/quic/congestion_control/Pacer.cpp b/quic/congestion_control/Pacer.cpp index 67e222340..9d4281316 100644 --- a/quic/congestion_control/Pacer.cpp +++ b/quic/congestion_control/Pacer.cpp @@ -9,6 +9,7 @@ #include #include +#include namespace quic { @@ -18,11 +19,18 @@ DefaultPacer::DefaultPacer( : conn_(conn), minCwndInMss_(minCwndInMss), batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), - pacingRateCalculator_(calculatePacingRateWrapper) {} + pacingRateCalculator_(calculatePacingRate) {} void DefaultPacer::refreshPacingRate( uint64_t cwndBytes, std::chrono::microseconds rtt) { + SCOPE_EXIT { + if (conn_.qLogger) { + conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_); + } + QUIC_TRACE( + pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_); + }; if (rtt < conn_.transportSettings.pacingTimerTickInterval) { writeInterval_ = 0us; batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; diff --git a/quic/congestion_control/QuicCubic.cpp b/quic/congestion_control/QuicCubic.cpp index 86d2bca5c..cc50ceba4 100644 --- a/quic/congestion_control/QuicCubic.cpp +++ b/quic/congestion_control/QuicCubic.cpp @@ -109,7 +109,10 @@ void Cubic::onPacketLoss(const LossEvent& loss) { state_ = CubicStates::FastRecovery; } ssthresh_ = cwndBytes_; - updatePacing(); + if (conn_.pacer) { + conn_.pacer->refreshPacingRate( + cwndBytes_ * pacingGain(), conn_.lossState.srtt); + } QUIC_TRACE( cubic_loss, conn_, @@ -412,7 +415,10 @@ void Cubic::onPacketAcked(const AckEvent& ack) { onPacketAckedInRecovery(ack); break; } - updatePacing(); + if (conn_.pacer) { + conn_.pacer->refreshPacingRate( + cwndBytes_ * pacingGain(), conn_.lossState.srtt); + } if (cwndBytes_ == currentCwnd) { QUIC_TRACE(fst_trace, conn_, "cwnd_no_change", quiescenceStart_.hasValue()); if (conn_.qLogger) { @@ -486,30 +492,6 @@ Cubic::CubicBuilder& Cubic::CubicBuilder::setPacingSpreadAcrossRtt( return *this; } -uint64_t Cubic::getPacingRate(TimePoint currentTime) noexcept { - // TODO: if this is the first query since the last time inflight reaches 0, we - // should give a larger burst size. - uint64_t extraBurstToken = 0; - if (scheduledNextWriteTime_.hasValue() && - currentTime > *scheduledNextWriteTime_) { - auto timerDrift = currentTime - *scheduledNextWriteTime_; - extraBurstToken = - std::ceil(timerDrift / pacingInterval_) * pacingBurstSize_; - } - scheduledNextWriteTime_.clear(); - return std::min( - conn_.transportSettings.maxBurstPackets, - pacingBurstSize_ + extraBurstToken); -} - -void Cubic::markPacerTimeoutScheduled(TimePoint currentTime) noexcept { - scheduledNextWriteTime_ = currentTime + pacingInterval_; -} - -std::chrono::microseconds Cubic::getPacingInterval() const noexcept { - return pacingInterval_; -} - float Cubic::pacingGain() const noexcept { double pacingGain = 1.0f; if (state_ == CubicStates::Hystart) { @@ -520,30 +502,6 @@ float Cubic::pacingGain() const noexcept { return pacingGain; } -void Cubic::updatePacing() noexcept { - std::tie(pacingInterval_, pacingBurstSize_) = calculatePacingRate( - conn_, - cwndBytes_ * pacingGain(), - conn_.transportSettings.minCwndInMss, - conn_.lossState.srtt); - if (pacingInterval_ == std::chrono::milliseconds::zero()) { - return; - } - if (!spreadAcrossRtt_) { - pacingInterval_ = conn_.transportSettings.pacingTimerTickInterval; - } - if (conn_.transportSettings.pacingEnabled) { - if (conn_.qLogger) { - conn_.qLogger->addPacingMetricUpdate(pacingBurstSize_, pacingInterval_); - } - QUIC_TRACE( - pacing_update, - conn_, - pacingInterval_.count(), - (uint64_t)pacingBurstSize_); - } -} - void Cubic::onPacketAckedInHystart(const AckEvent& ack) { if (!hystartState_.inRttRound) { startHystartRttRound(ack.ackTime); diff --git a/quic/congestion_control/QuicCubic.h b/quic/congestion_control/QuicCubic.h index c6e77b74d..0b9da4088 100644 --- a/quic/congestion_control/QuicCubic.h +++ b/quic/congestion_control/QuicCubic.h @@ -103,12 +103,6 @@ class Cubic : public CongestionController { CongestionControlType type() const noexcept override; - void markPacerTimeoutScheduled(TimePoint currentTime) noexcept override; - - std::chrono::microseconds getPacingInterval() const noexcept override; - - uint64_t getPacingRate(TimePoint currentTime) noexcept override; - protected: CubicStates state_{CubicStates::Hystart}; @@ -124,7 +118,6 @@ class Cubic : public CongestionController { void onPersistentCongestion(); float pacingGain() const noexcept; - void updatePacing() noexcept; void startHystartRttRound(TimePoint time) noexcept; @@ -204,13 +197,6 @@ class Cubic : public CongestionController { // evenly across an RTT. Otherwise, we will use the first N number of pacing // intervals to send all N bursts. bool spreadAcrossRtt_{false}; - // Pacing interval. One rtt is split into multiple inverals when pacing is on - // and the spreadAcrossRtt_ option it set. - std::chrono::microseconds pacingInterval_{0}; - // Pacing burst size - uint8_t pacingBurstSize_{0}; - - folly::Optional scheduledNextWriteTime_; }; folly::StringPiece cubicStateToString(CubicStates state); diff --git a/quic/congestion_control/test/BbrTest.cpp b/quic/congestion_control/test/BbrTest.cpp index 3756df6f3..f9a69dfbf 100644 --- a/quic/congestion_control/test/BbrTest.cpp +++ b/quic/congestion_control/test/BbrTest.cpp @@ -593,116 +593,6 @@ TEST_F(BbrTest, ExtendMinRttExpiration) { folly::none); } -TEST_F(BbrTest, Pacing) { - QuicConnectionStateBase conn(QuicNodeType::Client); - conn.lossState.srtt = 1ms; - conn.udpSendPacketLen = 1000; - conn.transportSettings.maxBurstPackets = std::numeric_limits::max(); - conn.transportSettings.pacingEnabled = true; - conn.transportSettings.pacingTimerTickInterval = 1ms; - auto qLogger = std::make_shared(); - conn.qLogger = qLogger; - - BbrCongestionController::BbrConfig config; - BbrCongestionController bbr(conn, config); - auto mockBandwidthSampler = std::make_unique(); - auto rawBandwidthSampler = mockBandwidthSampler.get(); - bbr.setBandwidthSampler(std::move(mockBandwidthSampler)); - auto mockRttSampler = std::make_unique(); - auto rawRttSampler = mockRttSampler.get(); - bbr.setRttSampler(std::move(mockRttSampler)); - auto expectedMinRtt = 20ms; - EXPECT_CALL(*rawRttSampler, minRtt()).WillRepeatedly(Return(expectedMinRtt)); - // Avoid ProbeRtt during this test case - EXPECT_CALL(*rawRttSampler, minRttExpired()).WillRepeatedly(Return(false)); - - PacketNum currentLatest = 0; - uint64_t totalSent = 0; - Bandwidth mockedBandwidth(2000 * 1000, 1ms); - std::deque> inflightPackets; - auto sendFunc = [&]() { - conn.lossState.largestSent = currentLatest; - auto packet = makeTestingWritePacket( - conn.lossState.largestSent, 1000, totalSent + 1000, false); - bbr.onPacketSent(packet); - inflightPackets.push_back(std::make_pair(currentLatest, packet.time)); - totalSent += 1000; - currentLatest++; - }; - auto ackAndGrow = [&](PacketNum packetToAck, TimePoint sentTime) { - EXPECT_CALL(*rawBandwidthSampler, getBandwidth()) - .WillRepeatedly(Return(mockedBandwidth)); - bbr.onPacketAckOrLoss( - makeAck(packetToAck, 1000, Clock::now(), sentTime), folly::none); - conn.lossState.totalBytesAcked += 1000; - }; - - auto sendAckGrow = [&](TimePoint ackTime) { - conn.lossState.largestSent = currentLatest; - auto packet = makeTestingWritePacket( - conn.lossState.largestSent, - 1000, - totalSent + 1000, - false, - ackTime - 1us); - bbr.onPacketSent(packet); - totalSent += 1000; - EXPECT_CALL(*rawBandwidthSampler, getBandwidth()) - .WillRepeatedly(Return(mockedBandwidth)); - // Make sure when we take Clock::now() inside updateRoundTripCounter, it - // will be later than ackTime - 1us when ackTime is Clock::now(). - std::this_thread::sleep_for(1us); - bbr.onPacketAckOrLoss( - makeAck(currentLatest, 1000, ackTime, packet.time), folly::none); - conn.lossState.totalBytesAcked += 1000; - currentLatest++; - }; - - // Take it to ProbeBw first - std::vector pacingRateVec; - for (uint32_t i = 0; i < kStartupSlowGrowRoundLimit + 2; i++) { - sendAckGrow(Clock::now()); - pacingRateVec.push_back(bbr.getPacingRate(Clock::now())); - } - - std::vector indices = - getQLogEventIndices(QLogEventType::PacingMetricUpdate, qLogger); - EXPECT_EQ(indices.size(), kStartupSlowGrowRoundLimit + 2); - for (uint32_t i = 0; i < kStartupSlowGrowRoundLimit + 2; i++) { - auto tmp = std::move(qLogger->logs[indices[i]]); - auto event = dynamic_cast(tmp.get()); - EXPECT_EQ(event->pacingBurstSize, pacingRateVec[i]); - EXPECT_EQ(event->pacingInterval, bbr.getPacingInterval()); - } - - for (size_t i = 0; i < 5; i++) { - sendFunc(); - } - while (inflightPackets.size() && - bbr.state() != BbrCongestionController::BbrState::ProbeBw) { - auto packetToAck = inflightPackets.back(); - ackAndGrow(packetToAck.first, packetToAck.second); - inflightPackets.pop_back(); - } - ASSERT_EQ(BbrCongestionController::BbrState::ProbeBw, bbr.state()); - auto currentTime = Clock::now(); // cycleStart_ is before this TimePoint - - // Throw out a big inflight bytes there. What a hack. - for (size_t i = 0; i < 100000; i++) { - sendFunc(); - } - // Loop through kPacingGainCycles, we will collect 3 different burst sizes - std::set burstSizes; - for (size_t i = 0; i < kNumOfCycles; i++) { - auto nextAckTime = currentTime + expectedMinRtt + 50us; - sendAckGrow(nextAckTime); - burstSizes.insert(bbr.getPacingRate(Clock::now())); - currentTime = nextAckTime; - } - EXPECT_EQ(3, burstSizes.size()); -} - TEST_F(BbrTest, BytesCounting) { QuicConnectionStateBase conn(QuicNodeType::Client); BbrCongestionController::BbrConfig config; diff --git a/quic/congestion_control/test/CongestionControlFunctionsTest.cpp b/quic/congestion_control/test/CongestionControlFunctionsTest.cpp index 671268eb9..231feddb7 100644 --- a/quic/congestion_control/test/CongestionControlFunctionsTest.cpp +++ b/quic/congestion_control/test/CongestionControlFunctionsTest.cpp @@ -26,14 +26,14 @@ TEST_F(CongestionControlFunctionsTest, CalculatePacingRate) { std::chrono::microseconds rtt(1000 * 100); auto result = calculatePacingRate(conn, 50, conn.transportSettings.minCwndInMss, rtt); - EXPECT_EQ(10ms, result.first); - EXPECT_EQ(5, result.second); + EXPECT_EQ(10ms, result.interval); + EXPECT_EQ(5, result.burstSize); conn.transportSettings.pacingTimerTickInterval = 1ms; auto result2 = calculatePacingRate(conn, 300, conn.transportSettings.minCwndInMss, rtt); - EXPECT_EQ(1ms, result2.first); - EXPECT_EQ(3, result2.second); + EXPECT_EQ(1ms, result2.interval); + EXPECT_EQ(3, result2.burstSize); } TEST_F(CongestionControlFunctionsTest, MinPacingRate) { @@ -42,8 +42,8 @@ TEST_F(CongestionControlFunctionsTest, MinPacingRate) { conn.transportSettings.pacingTimerTickInterval = 1ms; auto result = calculatePacingRate( conn, 100, conn.transportSettings.minCwndInMss, 100000us); - EXPECT_EQ(1ms, result.first); - EXPECT_EQ(1, result.second); + EXPECT_EQ(1ms, result.interval); + EXPECT_EQ(1, result.burstSize); } TEST_F(CongestionControlFunctionsTest, SmallCwnd) { @@ -52,8 +52,8 @@ TEST_F(CongestionControlFunctionsTest, SmallCwnd) { conn.transportSettings.pacingTimerTickInterval = 1ms; auto result = calculatePacingRate( conn, 10, conn.transportSettings.minCwndInMss, 100000us); - EXPECT_EQ(10ms, result.first); - EXPECT_EQ(1, result.second); + EXPECT_EQ(10ms, result.interval); + EXPECT_EQ(1, result.burstSize); } TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) { @@ -62,9 +62,9 @@ TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) { conn.transportSettings.pacingTimerTickInterval = 10ms; auto result = calculatePacingRate(conn, 10, conn.transportSettings.minCwndInMss, 1ms); - EXPECT_EQ(std::chrono::milliseconds::zero(), result.first); + EXPECT_EQ(std::chrono::milliseconds::zero(), result.interval); EXPECT_EQ( - conn.transportSettings.writeConnectionDataPacketsLimit, result.second); + conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize); } diff --git a/quic/congestion_control/test/CopaTest.cpp b/quic/congestion_control/test/CopaTest.cpp index 2f8972a03..369bdca5d 100644 --- a/quic/congestion_control/test/CopaTest.cpp +++ b/quic/congestion_control/test/CopaTest.cpp @@ -363,7 +363,6 @@ TEST_F(CopaTest, TestVelocity) { now += 100ms; // velocity = 1, direction = 0 copa.onPacketAckOrLoss(createAckEvent(30, packetSize, now), folly::none); - auto logNow = Clock::now(); uint64_t cwndChange = cwndChangeSteadyState(lastCwnd, velocity, packetSize, 0.5, conn); @@ -371,15 +370,6 @@ TEST_F(CopaTest, TestVelocity) { EXPECT_EQ(copa.getCongestionWindow(), lastCwnd + cwndChange); lastCwnd = copa.getCongestionWindow(); - std::vector indices = - getQLogEventIndices(QLogEventType::PacingMetricUpdate, qLogger); - EXPECT_EQ(indices.size(), 3); - - auto tmp = std::move(qLogger->logs[indices[2]]); - auto event = dynamic_cast(tmp.get()); - EXPECT_EQ(event->pacingBurstSize, copa.getPacingRate(logNow)); - EXPECT_EQ(event->pacingInterval, copa.getPacingInterval()); - // another ack, velocity = 1, direction 0 -> 1 now += 100ms; copa.onPacketAckOrLoss(createAckEvent(35, packetSize, now), folly::none); diff --git a/quic/congestion_control/test/CubicTest.cpp b/quic/congestion_control/test/CubicTest.cpp index ba1fffaf0..72c282891 100644 --- a/quic/congestion_control/test/CubicTest.cpp +++ b/quic/congestion_control/test/CubicTest.cpp @@ -9,6 +9,7 @@ #include #include #include +#include using namespace testing; @@ -249,41 +250,51 @@ TEST_F(CubicTest, AppIdle) { TEST_F(CubicTest, PacingGain) { QuicConnectionStateBase conn(QuicNodeType::Client); conn.transportSettings.pacingTimerTickInterval = 1ms; + auto mockPacer = std::make_unique(); + auto rawPacer = mockPacer.get(); + conn.pacer = std::move(mockPacer); auto qLogger = std::make_shared(); conn.qLogger = qLogger; - conn.udpSendPacketLen = 1500; Cubic cubic(conn); + conn.lossState.srtt = 3000us; auto packet = makeTestingWritePacket(0, 1500, 1500); cubic.onPacketSent(packet); + EXPECT_CALL(*rawPacer, refreshPacingRate(_, _)) + .Times(1) + .WillOnce(Invoke([&](uint64_t cwndBytes, std::chrono::microseconds) { + EXPECT_EQ(cubic.getCongestionWindow() * 2, cwndBytes); + })); cubic.onPacketAckOrLoss( makeAck(0, 1500, Clock::now(), packet.time), folly::none); EXPECT_EQ(CubicStates::Hystart, cubic.state()); - // 11 * 2 / (3 / 1), then take ceil - EXPECT_EQ(1ms, cubic.getPacingInterval()); - EXPECT_EQ(8, cubic.getPacingRate(Clock::now())); auto packet1 = makeTestingWritePacket(1, 1500, 3000); cubic.onPacketSent(packet1); CongestionController::LossEvent loss; loss.addLostPacket(packet1); // reduce cwnd to 9 MSS + EXPECT_CALL(*rawPacer, refreshPacingRate(_, _)) + .Times(1) + .WillOnce(Invoke([&](uint64_t cwndBytes, std::chrono::microseconds) { + EXPECT_EQ( + static_cast(cubic.getCongestionWindow() * 1.25), + cwndBytes); + })); cubic.onPacketAckOrLoss(folly::none, loss); EXPECT_EQ(CubicStates::FastRecovery, cubic.state()); - // 9 * 1.25 / (3 / 1) then take ceil - EXPECT_EQ(1ms, cubic.getPacingInterval()); - EXPECT_EQ(4, cubic.getPacingRate(Clock::now())); auto packet2 = makeTestingWritePacket(2, 1500, 4500); cubic.onPacketSent(packet2); + EXPECT_CALL(*rawPacer, refreshPacingRate(_, _)) + .Times(1) + .WillOnce(Invoke([&](uint64_t cwndBytes, std::chrono::microseconds) { + EXPECT_EQ(cubic.getCongestionWindow(), cwndBytes); + })); cubic.onPacketAckOrLoss( makeAck(2, 1500, Clock::now(), packet2.time), folly::none); EXPECT_EQ(CubicStates::Steady, cubic.state()); - // Cwnd should still be very close to 9 mss - // 9 / (3 / 1) - EXPECT_EQ(1ms, cubic.getPacingInterval()); - EXPECT_NEAR(3, cubic.getPacingRate(Clock::now()), 1); std::vector indices = getQLogEventIndices(QLogEventType::TransportStateUpdate, qLogger); @@ -292,52 +303,5 @@ TEST_F(CubicTest, PacingGain) { auto event = dynamic_cast(tmp.get()); EXPECT_EQ(event->update, kRecalculateTimeToOrigin); } - -TEST_F(CubicTest, PacingSpread) { - QuicConnectionStateBase conn(QuicNodeType::Client); - conn.transportSettings.pacingTimerTickInterval = 1ms; - conn.lossState.srtt = 60ms; - conn.udpSendPacketLen = 1500; - Cubic::CubicBuilder builder; - builder.setPacingSpreadAcrossRtt(true); - auto cubic = builder.build(conn); - - for (size_t i = 0; i < 5; i++) { - auto packet = makeTestingWritePacket(i, 1500, 4500 + 1500 * (1 + i)); - cubic->onPacketSent(packet); - cubic->onPacketAckOrLoss( - makeAck(i, 1500, Clock::now(), packet.time), folly::none); - } - ASSERT_EQ(1500 * 15, cubic->getCongestionWindow()); - EXPECT_EQ(1, cubic->getPacingRate(Clock::now())); - EXPECT_EQ(2ms, cubic->getPacingInterval()); -} - -TEST_F(CubicTest, LatePacingTimer) { - QuicConnectionStateBase conn(QuicNodeType::Client); - conn.transportSettings.pacingTimerTickInterval = 1ms; - conn.lossState.srtt = 50ms; - Cubic cubic(conn); - auto packet = - makeTestingWritePacket(0, conn.udpSendPacketLen, conn.udpSendPacketLen); - cubic.onPacketSent(packet); - cubic.onPacketAckOrLoss( - makeAck(0, conn.udpSendPacketLen, Clock::now(), packet.time), - folly::none); - - auto currentTime = Clock::now(); - auto pacingRateWithoutCompensation = cubic.getPacingRate(currentTime); - cubic.markPacerTimeoutScheduled(currentTime); - auto pacingRateWithCompensation = cubic.getPacingRate(currentTime + 50ms); - EXPECT_GT(pacingRateWithCompensation, pacingRateWithoutCompensation); - - // No matter how late it comes, you cannot go beyond the max limit - auto veryLatePacingRate = cubic.getPacingRate(currentTime + 100s); - EXPECT_GE(conn.transportSettings.maxBurstPackets, veryLatePacingRate); - - // But if you call getPacingRate again, it won't have compensation - auto pacingRateAgain = cubic.getPacingRate(currentTime + 50ms); - EXPECT_LT(pacingRateAgain, pacingRateWithCompensation); -} } // namespace test } // namespace quic diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 874cb82e7..cf4401f97 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -187,7 +187,7 @@ void QuicServerTransport::writeData() { uint64_t packetLimit = (isConnectionPaced(*conn_) - ? conn_->congestionController->getPacingRate(Clock::now()) + ? conn_->pacer->updateAndGetWriteBatchSize(Clock::now()) : conn_->transportSettings.writeConnectionDataPacketsLimit); CryptoStreamScheduler initialScheduler( *conn_, *getCryptoStream(*conn_->cryptoState, EncryptionLevel::Initial)); diff --git a/quic/state/QuicStateFunctions.cpp b/quic/state/QuicStateFunctions.cpp index fd2aff7aa..c776fcab4 100644 --- a/quic/state/QuicStateFunctions.cpp +++ b/quic/state/QuicStateFunctions.cpp @@ -147,8 +147,7 @@ void updateAckSendStateOnSentPacketWithAcks( bool isConnectionPaced(const QuicConnectionStateBase& conn) noexcept { return ( - conn.transportSettings.pacingEnabled && conn.canBePaced && - conn.congestionController); + conn.transportSettings.pacingEnabled && conn.canBePaced && conn.pacer); } AckState& getAckState( diff --git a/quic/state/StateData.h b/quic/state/StateData.h index eaa84ff73..a76cfd011 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -284,19 +284,6 @@ struct CongestionController { virtual void setAppLimited() = 0; virtual CongestionControlType type() const = 0; - /** - * Return pacing burst size. - */ - virtual uint64_t getPacingRate(TimePoint currentTime) = 0; - - virtual std::chrono::microseconds getPacingInterval() const = 0; - - /** - * Mark the time the transport schedules a pacing write. CongestionController - * needs to know this to compensate late time fires. - */ - virtual void markPacerTimeoutScheduled(TimePoint currentTime) = 0; - /** * Whether the congestion controller thinks it's currently in app-limited * state. @@ -423,6 +410,9 @@ struct QuicConnectionStateBase { // Connection Congestion controller std::unique_ptr congestionController; + // Pacer + std::unique_ptr pacer; + // Congestion Controller factory to create specific impl of cc algorithm std::shared_ptr congestionControllerFactory; diff --git a/quic/state/test/Mocks.h b/quic/state/test/Mocks.h index ebeeaa58f..99de116e8 100644 --- a/quic/state/test/Mocks.h +++ b/quic/state/test/Mocks.h @@ -27,12 +27,17 @@ class MockCongestionController : public CongestionController { MOCK_METHOD0(onSpuriousLoss, void()); GMOCK_METHOD1_(, , , setConnectionEmulation, void(uint8_t)); MOCK_CONST_METHOD0(type, CongestionControlType()); - GMOCK_METHOD1_(, , , getPacingRate, uint64_t(TimePoint)); - GMOCK_METHOD1_(, , , markPacerTimeoutScheduled, void(TimePoint)); - MOCK_CONST_METHOD0(getPacingInterval, std::chrono::microseconds()); GMOCK_METHOD2_(, , , setAppIdle, void(bool, TimePoint)); MOCK_METHOD0(setAppLimited, void()); MOCK_CONST_METHOD0(isAppLimited, bool()); }; + +class MockPacer : public Pacer { + public: + MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds)); + MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint)); + MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds()); + MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint)); +}; } // namespace test } // namespace quic