1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Replace pacing related callsites with new Pacer interface

Summary:
Use the new Pacer interface in the transport where we currently
directly use CongestinoController interrace for paciner related APIs.

Reviewed By: mjoras

Differential Revision: D16918672

fbshipit-source-id: 410f72d567e71bdd3279e344f6e9abf5e132518e
This commit is contained in:
Yang Chi
2019-08-30 19:29:18 -07:00
committed by Facebook Github Bot
parent b61452211e
commit c0a659a30a
23 changed files with 118 additions and 408 deletions

View File

@@ -240,6 +240,9 @@ constexpr size_t kBlockedSizeBytes = 20;
constexpr uint64_t kInitCwndInMss = 10;
constexpr uint64_t kMinCwndInMss = 2;
// Min cwnd for BBR is 4 MSS regard less of transport settings
constexpr uint64_t kMinCwndInMssForBbr{4};
constexpr uint64_t kDefaultMaxCwndInMss = 2000;
// When server receives early data attempt without valid source address token,
// server will limit bytes in flight to avoid amplification attack until CFIN

View File

@@ -12,6 +12,7 @@
#include <quic/api/LoopDetectorCallback.h>
#include <quic/api/QuicTransportFunctions.h>
#include <quic/common/TimeUtil.h>
#include <quic/congestion_control/Pacer.h>
#include <quic/logging/QLoggerConstants.h>
#include <quic/loss/QuicLossFunctions.h>
#include <quic/state/QuicPacingFunctions.h>
@@ -47,8 +48,8 @@ QuicTransportBase::QuicTransportBase(
LooperType::WriteLooper)) {
writeLooper_->setPacingFunction([this]() -> auto {
if (isConnectionPaced(*conn_)) {
conn_->congestionController->markPacerTimeoutScheduled(Clock::now());
return conn_->congestionController->getPacingInterval();
conn_->pacer->onPacedWriteScheduled(Clock::now());
return conn_->pacer->getTimeUntilNextWrite();
}
return 0us;
});
@@ -479,13 +480,11 @@ QuicSocket::TransportInfo QuicTransportBase::getTransportInfo() const {
if (conn_->congestionController) {
writableBytes = conn_->congestionController->getWritableBytes();
congestionWindow = conn_->congestionController->getCongestionWindow();
// Do not collect pacing stats for Cubic, since getPacingRate() call
// modifies some internal state. TODO(yangchi): Remove this check after
// changing Cubic implementation.
if (conn_->congestionController->type() != CongestionControlType::Cubic &&
isConnectionPaced(*conn_)) {
burstSize = conn_->congestionController->getPacingRate(Clock::now());
pacingInterval = conn_->congestionController->getPacingInterval();
// TODO: This is bad, we need an API that get without update pacing rate
burstSize = conn_->pacer->updateAndGetWriteBatchSize(Clock::now());
pacingInterval = conn_->pacer->getTimeUntilNextWrite();
}
}
TransportInfo transportInfo;
@@ -2290,6 +2289,14 @@ void QuicTransportBase::setTransportSettings(
TransportSettings transportSettings) {
conn_->transportSettings = std::move(transportSettings);
setCongestionControl(transportSettings.defaultCongestionController);
if (conn_->transportSettings.pacingEnabled) {
conn_->pacer = std::make_unique<DefaultPacer>(
*conn_,
transportSettings.defaultCongestionController ==
CongestionControlType::BBR
? kMinCwndInMssForBbr
: conn_->transportSettings.minCwndInMss);
}
}
const TransportSettings& QuicTransportBase::getTransportSettings() const {

View File

@@ -92,7 +92,7 @@ class TestQuicTransport
*headerCipher,
getVersion(),
(isConnectionPaced(*conn_)
? conn_->congestionController->getPacingRate(Clock::now())
? conn_->pacer->updateAndGetWriteBatchSize(Clock::now())
: conn_->transportSettings.writeConnectionDataPacketsLimit));
}
@@ -2466,6 +2466,9 @@ TEST_F(QuicTransportTest, PacingWillBurstFirst) {
conn.congestionController = std::move(mockCongestionController);
conn.transportSettings.pacingEnabled = true;
conn.canBePaced = true;
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
EXPECT_CALL(*rawCongestionController, getWritableBytes())
.WillRepeatedly(Return(100));
@@ -2473,7 +2476,7 @@ TEST_F(QuicTransportTest, PacingWillBurstFirst) {
auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, buf->clone(), false, false);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawCongestionController, getPacingRate(_))
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1));
transport_->pacedWrite(true);
}
@@ -2486,6 +2489,9 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) {
conn.congestionController = std::move(mockCongestionController);
conn.transportSettings.pacingEnabled = true;
conn.canBePaced = true;
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
EXPECT_CALL(*rawCongestionController, getWritableBytes())
.WillRepeatedly(Return(100));
@@ -2493,10 +2499,10 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) {
auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, buf->clone(), false, false);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawCongestionController, getPacingRate(_))
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1));
EXPECT_CALL(*rawCongestionController, markPacerTimeoutScheduled(_));
EXPECT_CALL(*rawCongestionController, getPacingInterval())
EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_));
EXPECT_CALL(*rawPacer, getTimeUntilNextWrite())
.WillRepeatedly(Return(3600000ms));
// This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will
// schedule a pacing timeout.
@@ -2515,6 +2521,9 @@ TEST_F(QuicTransportTest, NoScheduleIfNoNewData) {
conn.congestionController = std::move(mockCongestionController);
conn.transportSettings.pacingEnabled = true;
conn.canBePaced = true;
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
EXPECT_CALL(*rawCongestionController, getWritableBytes())
.WillRepeatedly(Return(1000));
@@ -2522,7 +2531,7 @@ TEST_F(QuicTransportTest, NoScheduleIfNoNewData) {
auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, buf->clone(), false, false);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawCongestionController, getPacingRate(_))
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1));
// This will write out everything. After that because there is no new data,
// FunctionLooper won't schedule a pacing timeout.

View File

@@ -723,7 +723,7 @@ void QuicClientTransport::writeData() {
uint64_t packetLimit =
(isConnectionPaced(*conn_)
? conn_->congestionController->getPacingRate(Clock::now())
? conn_->pacer->updateAndGetWriteBatchSize(Clock::now())
: conn_->transportSettings.writeConnectionDataPacketsLimit);
CryptoStreamScheduler initialScheduler(
*conn_, *getCryptoStream(*conn_->cryptoState, EncryptionLevel::Initial));

View File

@@ -251,17 +251,15 @@ void BbrCongestionController::onPacketAcked(
// canBePaced function. Now this function is gone, maybe we need to change this
// updatePacing function.
void BbrCongestionController::updatePacing() noexcept {
// TODO: enable Pacing and BBR together.
if (!conn_.pacer) {
return;
}
auto bandwidthEstimate = bandwidth();
if (!bandwidthEstimate) {
return;
}
auto mrtt = minRtt();
if (mrtt == 0us || mrtt < conn_.transportSettings.pacingTimerTickInterval) {
return;
}
// TODO(t40615081, yangchi) cloning Handshake packets make this better
VLOG_IF(10, conn_.lossState.srtt != 0us)
<< "no reliable srtt sample, " << *this;
uint64_t targetPacingWindow = bandwidthEstimate * pacingGain_ * mrtt;
if (btlbwFound_) {
pacingWindow_ = targetPacingWindow;
@@ -276,12 +274,7 @@ void BbrCongestionController::updatePacing() noexcept {
pacingWindow_ = std::max(pacingWindow_, targetPacingWindow);
}
// TODO: slower pacing if we are in STARTUP and loss has happened
std::tie(pacingInterval_, pacingBurstSize_) =
calculatePacingRate(conn_, pacingWindow_, kMinCwndInMssForBbr, mrtt);
if (conn_.transportSettings.pacingEnabled && conn_.qLogger) {
conn_.qLogger->addPacingMetricUpdate(pacingBurstSize_, pacingInterval_);
}
conn_.pacer->refreshPacingRate(pacingWindow_, mrtt);
}
void BbrCongestionController::handleAckInProbeBw(
@@ -582,20 +575,6 @@ void BbrCongestionController::onRemoveBytesFromInflight(
subtractAndCheckUnderflow(inflightBytes_, bytesToRemove);
}
uint64_t BbrCongestionController::getPacingRate(
TimePoint /* currentTime */) noexcept {
return pacingBurstSize_;
}
std::chrono::microseconds BbrCongestionController::getPacingInterval() const
noexcept {
return pacingInterval_;
}
void BbrCongestionController::markPacerTimeoutScheduled(TimePoint) noexcept {
/* This API is going away */
}
std::string bbrStateToString(BbrCongestionController::BbrState state) {
switch (state) {
case BbrCongestionController::BbrState::Startup:

View File

@@ -40,8 +40,6 @@ constexpr uint64_t kBandwidthWindowLength = kNumOfCycles + 2;
constexpr std::chrono::seconds kDefaultRttSamplerExpiration{10};
// See calculateReductionFactors in QuicCubic.cpp
constexpr double kBbrReductionFactor = 0.9;
// Min cwnd for BBR is 4 MSS regard less of transport settings
constexpr uint64_t kMinCwndInMssForBbr{4};
struct Bandwidth {
uint64_t bytes;
@@ -200,10 +198,6 @@ class BbrCongestionController : public CongestionController {
bool isAppLimited() const noexcept override;
uint64_t getPacingRate(TimePoint currentTime) noexcept override;
std::chrono::microseconds getPacingInterval() const noexcept override;
void markPacerTimeoutScheduled(TimePoint) noexcept override;
// TODO: some of these do not have to be in public API.
bool inRecovery() const noexcept;
BbrState state() const noexcept;
@@ -216,6 +210,7 @@ class BbrCongestionController : public CongestionController {
void
onPacketAcked(const AckEvent& ack, uint64_t prevInflightBytes, bool hasLoss);
void onPacketLoss(const LossEvent&);
void updatePacing() noexcept;
/**
* Update the ack aggregation states
@@ -264,7 +259,6 @@ class BbrCongestionController : public CongestionController {
uint64_t calculateTargetCwnd(float gain) const noexcept;
void updateCwnd(uint64_t ackedBytes, uint64_t excessiveBytes) noexcept;
void updatePacing() noexcept;
std::chrono::microseconds minRtt() const noexcept;
Bandwidth bandwidth() const noexcept;
@@ -291,8 +285,6 @@ class BbrCongestionController : public CongestionController {
uint64_t inflightBytes_{0};
// Number of bytes we expect to send over on RTT when paced write.
uint64_t pacingWindow_{0};
uint64_t pacingBurstSize_{0};
std::chrono::microseconds pacingInterval_{0us};
float cwndGain_{kStartupGain};
float pacingGain_{kStartupGain};

View File

@@ -24,27 +24,17 @@ uint64_t boundedCwnd(
minCwndInMss * packetLength);
}
PacingRate calculatePacingRateWrapper(
const QuicConnectionStateBase& conn,
uint64_t cwnd,
uint64_t minCwndInMss,
std::chrono::microseconds rtt) {
auto result = calculatePacingRate(conn, cwnd, minCwndInMss, rtt);
return PacingRate::Builder()
.setInterval(result.first)
.setBurstSize(result.second)
.build();
}
std::pair<std::chrono::microseconds, uint64_t> calculatePacingRate(
PacingRate calculatePacingRate(
const QuicConnectionStateBase& conn,
uint64_t cwnd,
uint64_t minCwndInMss,
std::chrono::microseconds rtt) {
if (conn.transportSettings.pacingTimerTickInterval > rtt) {
// We cannot really pace in this case.
return std::make_pair(
0us, conn.transportSettings.writeConnectionDataPacketsLimit);
return PacingRate::Builder()
.setInterval(0us)
.setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit)
.build();
}
uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen);
// Each interval we want to send cwndInpackets / (rtt / minimalInverval)
@@ -59,6 +49,9 @@ std::pair<std::chrono::microseconds, uint64_t> calculatePacingRate(
auto interval = timeMax(
conn.transportSettings.pacingTimerTickInterval,
rtt * burstPerInterval / cwndInPackets);
return std::make_pair(interval, burstPerInterval);
return PacingRate::Builder()
.setInterval(interval)
.setBurstSize(burstPerInterval)
.build();
}
} // namespace quic

View File

@@ -21,15 +21,7 @@ uint64_t boundedCwnd(
uint64_t maxCwndInMss,
uint64_t minCwndInMss) noexcept;
std::pair<std::chrono::microseconds, uint64_t> calculatePacingRate(
const QuicConnectionStateBase& conn,
uint64_t cwnd,
uint64_t minCwndInMss,
std::chrono::microseconds rtt);
// TODO: Temporary wrapper to make the transition from CC doing pacing to
// the real pacer easier. Need to remove it after real pacer is used everywhere.
PacingRate calculatePacingRateWrapper(
PacingRate calculatePacingRate(
const QuicConnectionStateBase& conn,
uint64_t cwnd,
uint64_t minCwndInMss,

View File

@@ -262,7 +262,9 @@ void Copa::onPacketAcked(const AckEvent& ack) {
cwndBytes_ -
conn_.transportSettings.minCwndInMss * conn_.udpSendPacketLen));
}
updatePacing();
if (conn_.pacer) {
conn_.pacer->refreshPacingRate(cwndBytes_ * 2, conn_.lossState.srtt);
}
}
void Copa::onPacketLoss(const LossEvent& loss) {
@@ -285,7 +287,9 @@ void Copa::onPacketLoss(const LossEvent& loss) {
bytesInFlight_, getCongestionWindow(), kPersistentCongestion);
}
cwndBytes_ = conn_.transportSettings.minCwndInMss * conn_.udpSendPacketLen;
updatePacing();
if (conn_.pacer) {
conn_.pacer->refreshPacingRate(cwndBytes_ * 2, conn_.lossState.srtt);
}
}
}
@@ -311,38 +315,10 @@ CongestionControlType Copa::type() const noexcept {
void Copa::setConnectionEmulation(uint8_t) noexcept {}
void Copa::updatePacing() noexcept {
std::tie(pacingInterval_, pacingBurstSize_) = calculatePacingRate(
conn_,
cwndBytes_ * 2,
conn_.transportSettings.minCwndInMss,
conn_.lossState.srtt);
if (pacingInterval_ == std::chrono::milliseconds::zero()) {
return;
}
if (conn_.transportSettings.pacingEnabled) {
VLOG(10) << "updatePacing pacingInterval_ = " << pacingInterval_.count()
<< ", pacingBurstSize_ " << pacingBurstSize_ << " " << conn_;
if (conn_.qLogger) {
conn_.qLogger->addPacingMetricUpdate(pacingBurstSize_, pacingInterval_);
}
}
}
uint64_t Copa::getBytesInFlight() const noexcept {
return bytesInFlight_;
}
uint64_t Copa::getPacingRate(TimePoint /* currentTime */) noexcept {
return pacingBurstSize_;
}
void Copa::markPacerTimeoutScheduled(TimePoint /* currentTime*/) noexcept {}
std::chrono::microseconds Copa::getPacingInterval() const noexcept {
return pacingInterval_;
}
void Copa::setAppIdle(bool, TimePoint) noexcept { /* unsupported */
}

View File

@@ -45,19 +45,11 @@ class Copa : public CongestionController {
void setConnectionEmulation(uint8_t) noexcept override;
void setAppIdle(bool, TimePoint) noexcept override;
void setAppLimited() override;
uint64_t getPacingRate(TimePoint currentTime) noexcept override;
void markPacerTimeoutScheduled(TimePoint currentTime) noexcept override;
std::chrono::microseconds getPacingInterval() const noexcept override;
bool isAppLimited() const noexcept override;
private:
void onPacketAcked(const AckEvent&);
void onPacketLoss(const LossEvent&);
void updatePacing() noexcept;
struct VelocityState {
uint64_t velocity{1};
@@ -106,8 +98,5 @@ class Copa : public CongestionController {
* it will minimize delay at expense of throughput.
*/
double latencyFactor_{0.50};
uint64_t pacingBurstSize_{0};
std::chrono::microseconds pacingInterval_{0us};
};
} // namespace quic

View File

@@ -96,6 +96,7 @@ void NewReno::onPacketAckOrLoss(
if (ackEvent && ackEvent->largestAckedPacket.hasValue()) {
onAckEvent(*ackEvent);
}
// TODO: Pacing isn't supported with NewReno
}
void NewReno::onPacketLoss(const LossEvent& loss) {
@@ -165,21 +166,6 @@ uint64_t NewReno::getBytesInFlight() const noexcept {
return bytesInFlight_;
}
uint64_t NewReno::getPacingRate(TimePoint /* currentTime */) noexcept {
// Pacing is not supported on NewReno currently
return conn_.transportSettings.writeConnectionDataPacketsLimit;
}
void NewReno::markPacerTimeoutScheduled(TimePoint /* currentTime */) noexcept {
// Pacing is not supported on NewReno currently
}
std::chrono::microseconds NewReno::getPacingInterval() const noexcept {
// Pacing is not supported on NewReno currently
return std::chrono::microseconds(
folly::HHWheelTimerHighRes::DEFAULT_TICK_INTERVAL);
}
void NewReno::setAppIdle(bool, TimePoint) noexcept { /* unsupported */
}

View File

@@ -35,12 +35,6 @@ class NewReno : public CongestionController {
uint64_t getBytesInFlight() const noexcept;
uint64_t getPacingRate(TimePoint currentTime) noexcept override;
void markPacerTimeoutScheduled(TimePoint currentTime) noexcept override;
std::chrono::microseconds getPacingInterval() const noexcept override;
bool isAppLimited() const noexcept override;
private:

View File

@@ -9,6 +9,7 @@
#include <quic/congestion_control/Pacer.h>
#include <quic/congestion_control/CongestionControlFunctions.h>
#include <quic/logging/QuicLogger.h>
namespace quic {
@@ -18,11 +19,18 @@ DefaultPacer::DefaultPacer(
: conn_(conn),
minCwndInMss_(minCwndInMss),
batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
pacingRateCalculator_(calculatePacingRateWrapper) {}
pacingRateCalculator_(calculatePacingRate) {}
void DefaultPacer::refreshPacingRate(
uint64_t cwndBytes,
std::chrono::microseconds rtt) {
SCOPE_EXIT {
if (conn_.qLogger) {
conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_);
}
QUIC_TRACE(
pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_);
};
if (rtt < conn_.transportSettings.pacingTimerTickInterval) {
writeInterval_ = 0us;
batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit;

View File

@@ -109,7 +109,10 @@ void Cubic::onPacketLoss(const LossEvent& loss) {
state_ = CubicStates::FastRecovery;
}
ssthresh_ = cwndBytes_;
updatePacing();
if (conn_.pacer) {
conn_.pacer->refreshPacingRate(
cwndBytes_ * pacingGain(), conn_.lossState.srtt);
}
QUIC_TRACE(
cubic_loss,
conn_,
@@ -412,7 +415,10 @@ void Cubic::onPacketAcked(const AckEvent& ack) {
onPacketAckedInRecovery(ack);
break;
}
updatePacing();
if (conn_.pacer) {
conn_.pacer->refreshPacingRate(
cwndBytes_ * pacingGain(), conn_.lossState.srtt);
}
if (cwndBytes_ == currentCwnd) {
QUIC_TRACE(fst_trace, conn_, "cwnd_no_change", quiescenceStart_.hasValue());
if (conn_.qLogger) {
@@ -486,30 +492,6 @@ Cubic::CubicBuilder& Cubic::CubicBuilder::setPacingSpreadAcrossRtt(
return *this;
}
uint64_t Cubic::getPacingRate(TimePoint currentTime) noexcept {
// TODO: if this is the first query since the last time inflight reaches 0, we
// should give a larger burst size.
uint64_t extraBurstToken = 0;
if (scheduledNextWriteTime_.hasValue() &&
currentTime > *scheduledNextWriteTime_) {
auto timerDrift = currentTime - *scheduledNextWriteTime_;
extraBurstToken =
std::ceil(timerDrift / pacingInterval_) * pacingBurstSize_;
}
scheduledNextWriteTime_.clear();
return std::min(
conn_.transportSettings.maxBurstPackets,
pacingBurstSize_ + extraBurstToken);
}
void Cubic::markPacerTimeoutScheduled(TimePoint currentTime) noexcept {
scheduledNextWriteTime_ = currentTime + pacingInterval_;
}
std::chrono::microseconds Cubic::getPacingInterval() const noexcept {
return pacingInterval_;
}
float Cubic::pacingGain() const noexcept {
double pacingGain = 1.0f;
if (state_ == CubicStates::Hystart) {
@@ -520,30 +502,6 @@ float Cubic::pacingGain() const noexcept {
return pacingGain;
}
void Cubic::updatePacing() noexcept {
std::tie(pacingInterval_, pacingBurstSize_) = calculatePacingRate(
conn_,
cwndBytes_ * pacingGain(),
conn_.transportSettings.minCwndInMss,
conn_.lossState.srtt);
if (pacingInterval_ == std::chrono::milliseconds::zero()) {
return;
}
if (!spreadAcrossRtt_) {
pacingInterval_ = conn_.transportSettings.pacingTimerTickInterval;
}
if (conn_.transportSettings.pacingEnabled) {
if (conn_.qLogger) {
conn_.qLogger->addPacingMetricUpdate(pacingBurstSize_, pacingInterval_);
}
QUIC_TRACE(
pacing_update,
conn_,
pacingInterval_.count(),
(uint64_t)pacingBurstSize_);
}
}
void Cubic::onPacketAckedInHystart(const AckEvent& ack) {
if (!hystartState_.inRttRound) {
startHystartRttRound(ack.ackTime);

View File

@@ -103,12 +103,6 @@ class Cubic : public CongestionController {
CongestionControlType type() const noexcept override;
void markPacerTimeoutScheduled(TimePoint currentTime) noexcept override;
std::chrono::microseconds getPacingInterval() const noexcept override;
uint64_t getPacingRate(TimePoint currentTime) noexcept override;
protected:
CubicStates state_{CubicStates::Hystart};
@@ -124,7 +118,6 @@ class Cubic : public CongestionController {
void onPersistentCongestion();
float pacingGain() const noexcept;
void updatePacing() noexcept;
void startHystartRttRound(TimePoint time) noexcept;
@@ -204,13 +197,6 @@ class Cubic : public CongestionController {
// evenly across an RTT. Otherwise, we will use the first N number of pacing
// intervals to send all N bursts.
bool spreadAcrossRtt_{false};
// Pacing interval. One rtt is split into multiple inverals when pacing is on
// and the spreadAcrossRtt_ option it set.
std::chrono::microseconds pacingInterval_{0};
// Pacing burst size
uint8_t pacingBurstSize_{0};
folly::Optional<TimePoint> scheduledNextWriteTime_;
};
folly::StringPiece cubicStateToString(CubicStates state);

View File

@@ -593,116 +593,6 @@ TEST_F(BbrTest, ExtendMinRttExpiration) {
folly::none);
}
TEST_F(BbrTest, Pacing) {
QuicConnectionStateBase conn(QuicNodeType::Client);
conn.lossState.srtt = 1ms;
conn.udpSendPacketLen = 1000;
conn.transportSettings.maxBurstPackets = std::numeric_limits<decltype(
conn.transportSettings.maxBurstPackets)>::max();
conn.transportSettings.pacingEnabled = true;
conn.transportSettings.pacingTimerTickInterval = 1ms;
auto qLogger = std::make_shared<FileQLogger>();
conn.qLogger = qLogger;
BbrCongestionController::BbrConfig config;
BbrCongestionController bbr(conn, config);
auto mockBandwidthSampler = std::make_unique<MockBandwidthSampler>();
auto rawBandwidthSampler = mockBandwidthSampler.get();
bbr.setBandwidthSampler(std::move(mockBandwidthSampler));
auto mockRttSampler = std::make_unique<MockMinRttSampler>();
auto rawRttSampler = mockRttSampler.get();
bbr.setRttSampler(std::move(mockRttSampler));
auto expectedMinRtt = 20ms;
EXPECT_CALL(*rawRttSampler, minRtt()).WillRepeatedly(Return(expectedMinRtt));
// Avoid ProbeRtt during this test case
EXPECT_CALL(*rawRttSampler, minRttExpired()).WillRepeatedly(Return(false));
PacketNum currentLatest = 0;
uint64_t totalSent = 0;
Bandwidth mockedBandwidth(2000 * 1000, 1ms);
std::deque<std::pair<PacketNum, TimePoint>> inflightPackets;
auto sendFunc = [&]() {
conn.lossState.largestSent = currentLatest;
auto packet = makeTestingWritePacket(
conn.lossState.largestSent, 1000, totalSent + 1000, false);
bbr.onPacketSent(packet);
inflightPackets.push_back(std::make_pair(currentLatest, packet.time));
totalSent += 1000;
currentLatest++;
};
auto ackAndGrow = [&](PacketNum packetToAck, TimePoint sentTime) {
EXPECT_CALL(*rawBandwidthSampler, getBandwidth())
.WillRepeatedly(Return(mockedBandwidth));
bbr.onPacketAckOrLoss(
makeAck(packetToAck, 1000, Clock::now(), sentTime), folly::none);
conn.lossState.totalBytesAcked += 1000;
};
auto sendAckGrow = [&](TimePoint ackTime) {
conn.lossState.largestSent = currentLatest;
auto packet = makeTestingWritePacket(
conn.lossState.largestSent,
1000,
totalSent + 1000,
false,
ackTime - 1us);
bbr.onPacketSent(packet);
totalSent += 1000;
EXPECT_CALL(*rawBandwidthSampler, getBandwidth())
.WillRepeatedly(Return(mockedBandwidth));
// Make sure when we take Clock::now() inside updateRoundTripCounter, it
// will be later than ackTime - 1us when ackTime is Clock::now().
std::this_thread::sleep_for(1us);
bbr.onPacketAckOrLoss(
makeAck(currentLatest, 1000, ackTime, packet.time), folly::none);
conn.lossState.totalBytesAcked += 1000;
currentLatest++;
};
// Take it to ProbeBw first
std::vector<uint64_t> pacingRateVec;
for (uint32_t i = 0; i < kStartupSlowGrowRoundLimit + 2; i++) {
sendAckGrow(Clock::now());
pacingRateVec.push_back(bbr.getPacingRate(Clock::now()));
}
std::vector<int> indices =
getQLogEventIndices(QLogEventType::PacingMetricUpdate, qLogger);
EXPECT_EQ(indices.size(), kStartupSlowGrowRoundLimit + 2);
for (uint32_t i = 0; i < kStartupSlowGrowRoundLimit + 2; i++) {
auto tmp = std::move(qLogger->logs[indices[i]]);
auto event = dynamic_cast<QLogPacingMetricUpdateEvent*>(tmp.get());
EXPECT_EQ(event->pacingBurstSize, pacingRateVec[i]);
EXPECT_EQ(event->pacingInterval, bbr.getPacingInterval());
}
for (size_t i = 0; i < 5; i++) {
sendFunc();
}
while (inflightPackets.size() &&
bbr.state() != BbrCongestionController::BbrState::ProbeBw) {
auto packetToAck = inflightPackets.back();
ackAndGrow(packetToAck.first, packetToAck.second);
inflightPackets.pop_back();
}
ASSERT_EQ(BbrCongestionController::BbrState::ProbeBw, bbr.state());
auto currentTime = Clock::now(); // cycleStart_ is before this TimePoint
// Throw out a big inflight bytes there. What a hack.
for (size_t i = 0; i < 100000; i++) {
sendFunc();
}
// Loop through kPacingGainCycles, we will collect 3 different burst sizes
std::set<uint64_t> burstSizes;
for (size_t i = 0; i < kNumOfCycles; i++) {
auto nextAckTime = currentTime + expectedMinRtt + 50us;
sendAckGrow(nextAckTime);
burstSizes.insert(bbr.getPacingRate(Clock::now()));
currentTime = nextAckTime;
}
EXPECT_EQ(3, burstSizes.size());
}
TEST_F(BbrTest, BytesCounting) {
QuicConnectionStateBase conn(QuicNodeType::Client);
BbrCongestionController::BbrConfig config;

View File

@@ -26,14 +26,14 @@ TEST_F(CongestionControlFunctionsTest, CalculatePacingRate) {
std::chrono::microseconds rtt(1000 * 100);
auto result =
calculatePacingRate(conn, 50, conn.transportSettings.minCwndInMss, rtt);
EXPECT_EQ(10ms, result.first);
EXPECT_EQ(5, result.second);
EXPECT_EQ(10ms, result.interval);
EXPECT_EQ(5, result.burstSize);
conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result2 =
calculatePacingRate(conn, 300, conn.transportSettings.minCwndInMss, rtt);
EXPECT_EQ(1ms, result2.first);
EXPECT_EQ(3, result2.second);
EXPECT_EQ(1ms, result2.interval);
EXPECT_EQ(3, result2.burstSize);
}
TEST_F(CongestionControlFunctionsTest, MinPacingRate) {
@@ -42,8 +42,8 @@ TEST_F(CongestionControlFunctionsTest, MinPacingRate) {
conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result = calculatePacingRate(
conn, 100, conn.transportSettings.minCwndInMss, 100000us);
EXPECT_EQ(1ms, result.first);
EXPECT_EQ(1, result.second);
EXPECT_EQ(1ms, result.interval);
EXPECT_EQ(1, result.burstSize);
}
TEST_F(CongestionControlFunctionsTest, SmallCwnd) {
@@ -52,8 +52,8 @@ TEST_F(CongestionControlFunctionsTest, SmallCwnd) {
conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result = calculatePacingRate(
conn, 10, conn.transportSettings.minCwndInMss, 100000us);
EXPECT_EQ(10ms, result.first);
EXPECT_EQ(1, result.second);
EXPECT_EQ(10ms, result.interval);
EXPECT_EQ(1, result.burstSize);
}
TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) {
@@ -62,9 +62,9 @@ TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) {
conn.transportSettings.pacingTimerTickInterval = 10ms;
auto result =
calculatePacingRate(conn, 10, conn.transportSettings.minCwndInMss, 1ms);
EXPECT_EQ(std::chrono::milliseconds::zero(), result.first);
EXPECT_EQ(std::chrono::milliseconds::zero(), result.interval);
EXPECT_EQ(
conn.transportSettings.writeConnectionDataPacketsLimit, result.second);
conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize);
}

View File

@@ -363,7 +363,6 @@ TEST_F(CopaTest, TestVelocity) {
now += 100ms;
// velocity = 1, direction = 0
copa.onPacketAckOrLoss(createAckEvent(30, packetSize, now), folly::none);
auto logNow = Clock::now();
uint64_t cwndChange =
cwndChangeSteadyState(lastCwnd, velocity, packetSize, 0.5, conn);
@@ -371,15 +370,6 @@ TEST_F(CopaTest, TestVelocity) {
EXPECT_EQ(copa.getCongestionWindow(), lastCwnd + cwndChange);
lastCwnd = copa.getCongestionWindow();
std::vector<int> indices =
getQLogEventIndices(QLogEventType::PacingMetricUpdate, qLogger);
EXPECT_EQ(indices.size(), 3);
auto tmp = std::move(qLogger->logs[indices[2]]);
auto event = dynamic_cast<QLogPacingMetricUpdateEvent*>(tmp.get());
EXPECT_EQ(event->pacingBurstSize, copa.getPacingRate(logNow));
EXPECT_EQ(event->pacingInterval, copa.getPacingInterval());
// another ack, velocity = 1, direction 0 -> 1
now += 100ms;
copa.onPacketAckOrLoss(createAckEvent(35, packetSize, now), folly::none);

View File

@@ -9,6 +9,7 @@
#include <folly/portability/GTest.h>
#include <quic/common/test/TestUtils.h>
#include <quic/congestion_control/test/TestingCubic.h>
#include <quic/state/test/Mocks.h>
using namespace testing;
@@ -249,41 +250,51 @@ TEST_F(CubicTest, AppIdle) {
TEST_F(CubicTest, PacingGain) {
QuicConnectionStateBase conn(QuicNodeType::Client);
conn.transportSettings.pacingTimerTickInterval = 1ms;
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
auto qLogger = std::make_shared<FileQLogger>();
conn.qLogger = qLogger;
conn.udpSendPacketLen = 1500;
Cubic cubic(conn);
conn.lossState.srtt = 3000us;
auto packet = makeTestingWritePacket(0, 1500, 1500);
cubic.onPacketSent(packet);
EXPECT_CALL(*rawPacer, refreshPacingRate(_, _))
.Times(1)
.WillOnce(Invoke([&](uint64_t cwndBytes, std::chrono::microseconds) {
EXPECT_EQ(cubic.getCongestionWindow() * 2, cwndBytes);
}));
cubic.onPacketAckOrLoss(
makeAck(0, 1500, Clock::now(), packet.time), folly::none);
EXPECT_EQ(CubicStates::Hystart, cubic.state());
// 11 * 2 / (3 / 1), then take ceil
EXPECT_EQ(1ms, cubic.getPacingInterval());
EXPECT_EQ(8, cubic.getPacingRate(Clock::now()));
auto packet1 = makeTestingWritePacket(1, 1500, 3000);
cubic.onPacketSent(packet1);
CongestionController::LossEvent loss;
loss.addLostPacket(packet1);
// reduce cwnd to 9 MSS
EXPECT_CALL(*rawPacer, refreshPacingRate(_, _))
.Times(1)
.WillOnce(Invoke([&](uint64_t cwndBytes, std::chrono::microseconds) {
EXPECT_EQ(
static_cast<uint64_t>(cubic.getCongestionWindow() * 1.25),
cwndBytes);
}));
cubic.onPacketAckOrLoss(folly::none, loss);
EXPECT_EQ(CubicStates::FastRecovery, cubic.state());
// 9 * 1.25 / (3 / 1) then take ceil
EXPECT_EQ(1ms, cubic.getPacingInterval());
EXPECT_EQ(4, cubic.getPacingRate(Clock::now()));
auto packet2 = makeTestingWritePacket(2, 1500, 4500);
cubic.onPacketSent(packet2);
EXPECT_CALL(*rawPacer, refreshPacingRate(_, _))
.Times(1)
.WillOnce(Invoke([&](uint64_t cwndBytes, std::chrono::microseconds) {
EXPECT_EQ(cubic.getCongestionWindow(), cwndBytes);
}));
cubic.onPacketAckOrLoss(
makeAck(2, 1500, Clock::now(), packet2.time), folly::none);
EXPECT_EQ(CubicStates::Steady, cubic.state());
// Cwnd should still be very close to 9 mss
// 9 / (3 / 1)
EXPECT_EQ(1ms, cubic.getPacingInterval());
EXPECT_NEAR(3, cubic.getPacingRate(Clock::now()), 1);
std::vector<int> indices =
getQLogEventIndices(QLogEventType::TransportStateUpdate, qLogger);
@@ -292,52 +303,5 @@ TEST_F(CubicTest, PacingGain) {
auto event = dynamic_cast<QLogTransportStateUpdateEvent*>(tmp.get());
EXPECT_EQ(event->update, kRecalculateTimeToOrigin);
}
TEST_F(CubicTest, PacingSpread) {
QuicConnectionStateBase conn(QuicNodeType::Client);
conn.transportSettings.pacingTimerTickInterval = 1ms;
conn.lossState.srtt = 60ms;
conn.udpSendPacketLen = 1500;
Cubic::CubicBuilder builder;
builder.setPacingSpreadAcrossRtt(true);
auto cubic = builder.build(conn);
for (size_t i = 0; i < 5; i++) {
auto packet = makeTestingWritePacket(i, 1500, 4500 + 1500 * (1 + i));
cubic->onPacketSent(packet);
cubic->onPacketAckOrLoss(
makeAck(i, 1500, Clock::now(), packet.time), folly::none);
}
ASSERT_EQ(1500 * 15, cubic->getCongestionWindow());
EXPECT_EQ(1, cubic->getPacingRate(Clock::now()));
EXPECT_EQ(2ms, cubic->getPacingInterval());
}
TEST_F(CubicTest, LatePacingTimer) {
QuicConnectionStateBase conn(QuicNodeType::Client);
conn.transportSettings.pacingTimerTickInterval = 1ms;
conn.lossState.srtt = 50ms;
Cubic cubic(conn);
auto packet =
makeTestingWritePacket(0, conn.udpSendPacketLen, conn.udpSendPacketLen);
cubic.onPacketSent(packet);
cubic.onPacketAckOrLoss(
makeAck(0, conn.udpSendPacketLen, Clock::now(), packet.time),
folly::none);
auto currentTime = Clock::now();
auto pacingRateWithoutCompensation = cubic.getPacingRate(currentTime);
cubic.markPacerTimeoutScheduled(currentTime);
auto pacingRateWithCompensation = cubic.getPacingRate(currentTime + 50ms);
EXPECT_GT(pacingRateWithCompensation, pacingRateWithoutCompensation);
// No matter how late it comes, you cannot go beyond the max limit
auto veryLatePacingRate = cubic.getPacingRate(currentTime + 100s);
EXPECT_GE(conn.transportSettings.maxBurstPackets, veryLatePacingRate);
// But if you call getPacingRate again, it won't have compensation
auto pacingRateAgain = cubic.getPacingRate(currentTime + 50ms);
EXPECT_LT(pacingRateAgain, pacingRateWithCompensation);
}
} // namespace test
} // namespace quic

View File

@@ -187,7 +187,7 @@ void QuicServerTransport::writeData() {
uint64_t packetLimit =
(isConnectionPaced(*conn_)
? conn_->congestionController->getPacingRate(Clock::now())
? conn_->pacer->updateAndGetWriteBatchSize(Clock::now())
: conn_->transportSettings.writeConnectionDataPacketsLimit);
CryptoStreamScheduler initialScheduler(
*conn_, *getCryptoStream(*conn_->cryptoState, EncryptionLevel::Initial));

View File

@@ -147,8 +147,7 @@ void updateAckSendStateOnSentPacketWithAcks(
bool isConnectionPaced(const QuicConnectionStateBase& conn) noexcept {
return (
conn.transportSettings.pacingEnabled && conn.canBePaced &&
conn.congestionController);
conn.transportSettings.pacingEnabled && conn.canBePaced && conn.pacer);
}
AckState& getAckState(

View File

@@ -284,19 +284,6 @@ struct CongestionController {
virtual void setAppLimited() = 0;
virtual CongestionControlType type() const = 0;
/**
* Return pacing burst size.
*/
virtual uint64_t getPacingRate(TimePoint currentTime) = 0;
virtual std::chrono::microseconds getPacingInterval() const = 0;
/**
* Mark the time the transport schedules a pacing write. CongestionController
* needs to know this to compensate late time fires.
*/
virtual void markPacerTimeoutScheduled(TimePoint currentTime) = 0;
/**
* Whether the congestion controller thinks it's currently in app-limited
* state.
@@ -423,6 +410,9 @@ struct QuicConnectionStateBase {
// Connection Congestion controller
std::unique_ptr<CongestionController> congestionController;
// Pacer
std::unique_ptr<Pacer> pacer;
// Congestion Controller factory to create specific impl of cc algorithm
std::shared_ptr<CongestionControllerFactory> congestionControllerFactory;

View File

@@ -27,12 +27,17 @@ class MockCongestionController : public CongestionController {
MOCK_METHOD0(onSpuriousLoss, void());
GMOCK_METHOD1_(, , , setConnectionEmulation, void(uint8_t));
MOCK_CONST_METHOD0(type, CongestionControlType());
GMOCK_METHOD1_(, , , getPacingRate, uint64_t(TimePoint));
GMOCK_METHOD1_(, , , markPacerTimeoutScheduled, void(TimePoint));
MOCK_CONST_METHOD0(getPacingInterval, std::chrono::microseconds());
GMOCK_METHOD2_(, , , setAppIdle, void(bool, TimePoint));
MOCK_METHOD0(setAppLimited, void());
MOCK_CONST_METHOD0(isAppLimited, bool());
};
class MockPacer : public Pacer {
public:
MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds));
MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint));
MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds());
MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint));
};
} // namespace test
} // namespace quic