1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

Quic pacing refactor

Summary:
(1) The first change is the pacing rate calculation is simplified. It
removes the interval calculation and just uses the timer tick as the interval.
Then it calculates the burst size from there.  For most cases these two
calculation should land at the same result, except when the
`cwnd < minBurstSize * tick / RTT`. In that case, the current calculation would
spread writes evenly across one RTT, assuming no new Ack arrives during the RTT;
while the new calculation uses the first a few ticks to finish the cwnd amount
of data.

(2) Then this diff changes how we compensate late timer. Now the pacer will
maintain a nextWriteTime_ and lastWriteTime_, which makes it easier to
calculate time elapsed since last write. Then each time writer tries to write,
it will be allowed to write timeElapsed * pacingRate. This is much more
intuitive than the current logic.

(3) The diff also adds pacing limited tracking into the pacer. An expected
pacing rate is cached when pacing rate is refreshed by congestion controller.
Then with packets sent out, Pacer keeps calculating the current send rate. When
the send rate is lower, Pacer sets pacingLimited_ to true. Otherwise false.

Only when the connection is not pacing limited, the lastWriteTime_ will be
packet sent time, otherwise it will be set to the last nextWriteTime_. In other
words: if the send rate is lower than expected, we use the expected send time
instead of real send time to calculate time elapsed, to allow higher late
timer compenstation, to give pacer a chance to catch up.

(4) Finally this diff removes the token collecting behavior in the pacer. I
think having tokens increaed, instead of reset, when an ack refreshes the pacing
rate or when we compensate late time, is quite confusing to some people. After
all the above changes, I found tperf can still sustain good throughput without
always increase tokens, and rally actualy gives even better results. So i think
we can remove this part of the pacer that's potentially very confusing to
people who don't know how we got there.

Reviewed By: mjoras

Differential Revision: D19252744

fbshipit-source-id: b83e4a01fc812fc52117f3ec0f5c3be1badf211f
This commit is contained in:
Yang Chi
2020-01-17 10:08:46 -08:00
committed by Facebook Github Bot
parent 88efcd81f6
commit edb5104858
11 changed files with 117 additions and 136 deletions

View File

@@ -49,7 +49,6 @@ QuicTransportBase::QuicTransportBase(
LooperType::WriteLooper)) { LooperType::WriteLooper)) {
writeLooper_->setPacingFunction([this]() -> auto { writeLooper_->setPacingFunction([this]() -> auto {
if (isConnectionPaced(*conn_)) { if (isConnectionPaced(*conn_)) {
conn_->pacer->onPacedWriteScheduled(Clock::now());
return conn_->pacer->getTimeUntilNextWrite(); return conn_->pacer->getTimeUntilNextWrite();
} }
return 0us; return 0us;

View File

@@ -402,7 +402,7 @@ void updateConnection(
} }
} }
if (conn.pacer) { if (conn.pacer) {
conn.pacer->onPacketSent(); conn.pacer->onPacketSent(pkt.encodedSize);
} }
if (conn.pathValidationLimiter && if (conn.pathValidationLimiter &&
(conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) { (conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) {

View File

@@ -765,7 +765,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) {
ackFrame.ackBlocks.emplace_back(0, 10); ackFrame.ackBlocks.emplace_back(0, 10);
packet.packet.frames.push_back(std::move(ackFrame)); packet.packet.frames.push_back(std::move(ackFrame));
EXPECT_CALL(*rawController, onPacketSent(_)).Times(0); EXPECT_CALL(*rawController, onPacketSent(_)).Times(0);
EXPECT_CALL(*rawPacer, onPacketSent()).Times(0); EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(0);
updateConnection( updateConnection(
*conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet)); *conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet));
EXPECT_EQ(0, conn->outstandingPackets.size()); EXPECT_EQ(0, conn->outstandingPackets.size());
@@ -1020,7 +1020,7 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicdataToSocketWithPacer) {
IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012"); IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012");
writeDataToQuicStream(*stream1, buf->clone(), true); writeDataToQuicStream(*stream1, buf->clone(), true);
EXPECT_CALL(*rawPacer, onPacketSent()).Times(1); EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(1);
EXPECT_CALL(*transportInfoCb_, onWrite(_)); EXPECT_CALL(*transportInfoCb_, onWrite(_));
writeQuicDataToSocket( writeQuicDataToSocket(
*rawSocket, *rawSocket,

View File

@@ -2511,7 +2511,6 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) {
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1)); .WillRepeatedly(Return(1));
EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_));
EXPECT_CALL(*rawPacer, getTimeUntilNextWrite()) EXPECT_CALL(*rawPacer, getTimeUntilNextWrite())
.WillRepeatedly(Return(3600000ms)); .WillRepeatedly(Return(3600000ms));
// This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will // This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will

View File

@@ -29,29 +29,21 @@ PacingRate calculatePacingRate(
uint64_t cwnd, uint64_t cwnd,
uint64_t minCwndInMss, uint64_t minCwndInMss,
std::chrono::microseconds rtt) { std::chrono::microseconds rtt) {
if (conn.transportSettings.pacingTimerTickInterval > rtt) { if (conn.transportSettings.pacingTimerTickInterval >= rtt) {
// We cannot really pace in this case. // We cannot really pace in this case.
return PacingRate::Builder() return PacingRate::Builder()
.setInterval(0us) .setInterval(0us)
.setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit) .setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit)
.build(); .build();
} }
uint64_t numIntervals = rtt / conn.transportSettings.pacingTimerTickInterval;
uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen); uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen);
// Each interval we want to send cwndInpackets / (rtt / minimalInverval)
// number of packets.
uint64_t burstPerInterval = std::max( uint64_t burstPerInterval = std::max(
conn.transportSettings.minBurstPackets, conn.transportSettings.minBurstPackets, cwndInPackets / numIntervals);
static_cast<uint64_t>(std::ceil(
static_cast<double>(cwndInPackets) *
static_cast<double>(
conn.transportSettings.pacingTimerTickInterval.count()) /
static_cast<double>(rtt.count()))));
auto interval = timeMax(
conn.transportSettings.pacingTimerTickInterval,
rtt * burstPerInterval / cwndInPackets);
return PacingRate::Builder() return PacingRate::Builder()
.setInterval(interval) .setInterval(conn.transportSettings.pacingTimerTickInterval)
.setBurstSize(burstPerInterval) .setBurstSize(burstPerInterval)
.build(); .build();
} }
} // namespace quic } // namespace quic

View File

@@ -8,6 +8,7 @@
#include <quic/congestion_control/Pacer.h> #include <quic/congestion_control/Pacer.h>
#include <quic/common/TimeUtil.h>
#include <quic/congestion_control/CongestionControlFunctions.h> #include <quic/congestion_control/CongestionControlFunctions.h>
#include <quic/logging/QuicLogger.h> #include <quic/logging/QuicLogger.h>
@@ -21,7 +22,9 @@ DefaultPacer::DefaultPacer(
batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
pacingRateCalculator_(calculatePacingRate), pacingRateCalculator_(calculatePacingRate),
cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
tokens_(conn.transportSettings.writeConnectionDataPacketsLimit) {} tokens_(conn.transportSettings.writeConnectionDataPacketsLimit),
nextWriteTime_(Clock::now()),
lastWriteTime_(Clock::now()) {}
// TODO: we choose to keep refershing pacing rate even when we are app-limited, // TODO: we choose to keep refershing pacing rate even when we are app-limited,
// so that when we exit app-limited, we have an updated pacing rate. But I don't // so that when we exit app-limited, we have an updated pacing rate. But I don't
@@ -29,15 +32,17 @@ DefaultPacer::DefaultPacer(
void DefaultPacer::refreshPacingRate( void DefaultPacer::refreshPacingRate(
uint64_t cwndBytes, uint64_t cwndBytes,
std::chrono::microseconds rtt) { std::chrono::microseconds rtt) {
auto currentTime = Clock::now();
if (rtt < conn_.transportSettings.pacingTimerTickInterval) { if (rtt < conn_.transportSettings.pacingTimerTickInterval) {
writeInterval_ = 0us; writeInterval_ = 0us;
batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit;
} else { } else {
const PacingRate pacingRate = const auto pacingRate =
pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt); pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt);
writeInterval_ = pacingRate.interval; writeInterval_ = pacingRate.interval;
batchSize_ = pacingRate.burstSize; batchSize_ = pacingRate.burstSize;
tokens_ += batchSize_; lastPacingRateUpdate_ = currentTime;
bytesSentSincePacingRateUpdate_ = 0;
} }
if (conn_.qLogger) { if (conn_.qLogger) {
conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_); conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_);
@@ -45,16 +50,40 @@ void DefaultPacer::refreshPacingRate(
QUIC_TRACE( QUIC_TRACE(
pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_); pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_);
cachedBatchSize_ = batchSize_; cachedBatchSize_ = batchSize_;
tokens_ = batchSize_;
nextWriteTime_ = currentTime;
if (firstUpdate_) {
firstUpdate_ = false;
lastWriteTime_ = currentTime;
}
} }
void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) { void DefaultPacer::onPacketSent(uint64_t bytesSent) {
scheduledWriteTime_ = currentTime;
}
void DefaultPacer::onPacketSent() {
if (tokens_) { if (tokens_) {
--tokens_; --tokens_;
} }
bytesSentSincePacingRateUpdate_ += bytesSent;
if (writeInterval_ != 0us && cachedBatchSize_ && !appLimited_ &&
lastPacingRateUpdate_) {
Bandwidth expectedBandwidth(
cachedBatchSize_ * conn_.udpSendPacketLen, writeInterval_);
if (expectedBandwidth) {
Bandwidth actualPacingBandwidth(
bytesSentSincePacingRateUpdate_,
std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - *lastPacingRateUpdate_));
pacingLimited_ = actualPacingBandwidth < expectedBandwidth;
}
} else {
pacingLimited_ = false;
}
if (!pacingLimited_) {
lastWriteTime_ = Clock::now();
}
}
bool DefaultPacer::isPacingLimited() const noexcept {
return pacingLimited_;
} }
void DefaultPacer::onPacketsLoss() { void DefaultPacer::onPacketsLoss() {
@@ -62,34 +91,30 @@ void DefaultPacer::onPacketsLoss() {
} }
std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const { std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const {
return (appLimited_ || tokens_) ? 0us : writeInterval_; return (writeInterval_ == 0us || appLimited_ || tokens_ ||
Clock::now() + conn_.transportSettings.pacingTimerTickInterval >=
nextWriteTime_)
? 0us
: timeMax(
conn_.transportSettings.pacingTimerTickInterval,
timeMin(
writeInterval_,
std::chrono::duration_cast<std::chrono::microseconds>(
nextWriteTime_ - Clock::now())));
} }
uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) { uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) {
SCOPE_EXIT { SCOPE_EXIT {
scheduledWriteTime_.clear(); lastWriteTime_ = nextWriteTime_;
nextWriteTime_ += writeInterval_;
}; };
if (appLimited_) { if (appLimited_ || writeInterval_ == 0us) {
cachedBatchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; return conn_.transportSettings.writeConnectionDataPacketsLimit;
return cachedBatchSize_;
}
if (writeInterval_ == 0us) {
return batchSize_;
}
if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) {
return tokens_;
} }
auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>( auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>(
currentTime - *scheduledWriteTime_ + writeInterval_); timeMax(currentTime - lastWriteTime_, writeInterval_));
cachedBatchSize_ = std::ceil( return std::ceil(
adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count()); adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count());
if (cachedBatchSize_ < batchSize_) {
LOG(ERROR)
<< "Quic pacer batch size calculation: cachedBatchSize < batchSize";
}
tokens_ +=
(cachedBatchSize_ > batchSize_ ? cachedBatchSize_ - batchSize_ : 0);
return tokens_;
} }
uint64_t DefaultPacer::getCachedWriteBatchSize() const { uint64_t DefaultPacer::getCachedWriteBatchSize() const {

View File

@@ -8,6 +8,7 @@
#pragma once #pragma once
#include <quic/congestion_control/Bandwidth.h>
#include <quic/state/StateData.h> #include <quic/state/StateData.h>
namespace quic { namespace quic {
@@ -31,8 +32,6 @@ class DefaultPacer : public Pacer {
void refreshPacingRate(uint64_t cwndBytes, std::chrono::microseconds rtt) void refreshPacingRate(uint64_t cwndBytes, std::chrono::microseconds rtt)
override; override;
void onPacedWriteScheduled(TimePoint currentTime) override;
std::chrono::microseconds getTimeUntilNextWrite() const override; std::chrono::microseconds getTimeUntilNextWrite() const override;
uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override; uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override;
@@ -43,9 +42,12 @@ class DefaultPacer : public Pacer {
void setAppLimited(bool limited) override; void setAppLimited(bool limited) override;
void onPacketSent() override; void onPacketSent(uint64_t bytesSent) override;
void onPacketsLoss() override; void onPacketsLoss() override;
// Only used for test:
bool isPacingLimited() const noexcept;
private: private:
const QuicConnectionStateBase& conn_; const QuicConnectionStateBase& conn_;
uint64_t minCwndInMss_; uint64_t minCwndInMss_;
@@ -56,5 +58,11 @@ class DefaultPacer : public Pacer {
uint64_t cachedBatchSize_; uint64_t cachedBatchSize_;
bool appLimited_{false}; bool appLimited_{false};
uint64_t tokens_; uint64_t tokens_;
uint64_t bytesSentSincePacingRateUpdate_{0};
folly::Optional<TimePoint> lastPacingRateUpdate_;
bool pacingLimited_{false};
TimePoint nextWriteTime_;
TimePoint lastWriteTime_;
bool firstUpdate_{true};
}; };
} // namespace quic } // namespace quic

View File

@@ -43,9 +43,7 @@ TEST_F(CongestionControlFunctionsTest, MinPacingRate) {
conn.transportSettings.pacingTimerTickInterval = 1ms; conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result = calculatePacingRate( auto result = calculatePacingRate(
conn, 100, conn.transportSettings.minCwndInMss, 100ms); conn, 100, conn.transportSettings.minCwndInMss, 100ms);
// 100 ms rtt, 1ms tick interval, 100 mss cwnd, 5 mss min burst -> 5 mss every EXPECT_EQ(1ms, result.interval);
// 5ms
EXPECT_EQ(5ms, result.interval);
EXPECT_EQ(conn.transportSettings.minBurstPackets, result.burstSize); EXPECT_EQ(conn.transportSettings.minBurstPackets, result.burstSize);
} }
@@ -56,7 +54,7 @@ TEST_F(CongestionControlFunctionsTest, SmallCwnd) {
conn.transportSettings.pacingTimerTickInterval = 1ms; conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result = calculatePacingRate( auto result = calculatePacingRate(
conn, 10, conn.transportSettings.minCwndInMss, 100000us); conn, 10, conn.transportSettings.minCwndInMss, 100000us);
EXPECT_EQ(10ms, result.interval); EXPECT_EQ(1ms, result.interval);
EXPECT_EQ(1, result.burstSize); EXPECT_EQ(1, result.burstSize);
} }
@@ -72,6 +70,5 @@ TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) {
conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize); conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize);
} }
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -17,7 +17,7 @@ namespace test {
namespace { namespace {
void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) { void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) {
for (size_t i = 0; i < tokensToConsume; i++) { for (size_t i = 0; i < tokensToConsume; i++) {
pacer.onPacketSent(); pacer.onPacketSent(1000);
} }
} }
} // namespace } // namespace
@@ -45,16 +45,19 @@ TEST_F(PacerTest, RateCalculator) {
uint64_t, uint64_t,
uint64_t, uint64_t,
std::chrono::microseconds) { std::chrono::microseconds) {
return PacingRate::Builder().setInterval(1234us).setBurstSize(4321).build(); return PacingRate::Builder().setInterval(500ms).setBurstSize(4321).build();
}); });
auto currentTime = Clock::now();
pacer.refreshPacingRate(200000, 200us); pacer.refreshPacingRate(200000, 200us);
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ( EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(currentTime));
4321 + conn.transportSettings.writeConnectionDataPacketsLimit, consumeTokensHelper(pacer, 4321);
pacer.updateAndGetWriteBatchSize(Clock::now())); EXPECT_NEAR(
consumeTokensHelper( std::chrono::duration_cast<std::chrono::microseconds>(
pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit); 500ms + currentTime - Clock::now())
EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite()); .count(),
pacer.getTimeUntilNextWrite().count(),
2000);
} }
TEST_F(PacerTest, CompensateTimerDrift) { TEST_F(PacerTest, CompensateTimerDrift) {
@@ -66,23 +69,18 @@ TEST_F(PacerTest, CompensateTimerDrift) {
}); });
auto currentTime = Clock::now(); auto currentTime = Clock::now();
pacer.refreshPacingRate(20, 100us); // These two values do not matter here pacer.refreshPacingRate(20, 100us); // These two values do not matter here
pacer.onPacedWriteScheduled(currentTime); // After refresh, both last and next write time is very close to currentTime
EXPECT_EQ( EXPECT_NEAR(10, pacer.updateAndGetWriteBatchSize(currentTime + 1000us), 2);
20 + conn.transportSettings.writeConnectionDataPacketsLimit, // lastWriteTime ~= currentTime, nextWriteTime ~= currentTime + 1000us
pacer.updateAndGetWriteBatchSize(currentTime + 1000us));
// Query batch size again without calling onPacedWriteScheduled won't do timer EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2);
// drift compensation. But token_ keeps the last compenstation. // lastWriteTime ~= currentTime + 1000us, nextWriteTime ~= currentTime +
EXPECT_EQ( // 2000us
20 + conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
// Consume a few: // Consume a few:
consumeTokensHelper(pacer, 3); consumeTokensHelper(pacer, 3);
EXPECT_EQ( EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2);
20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3,
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
} }
TEST_F(PacerTest, NextWriteTime) { TEST_F(PacerTest, NextWriteTime) {
@@ -94,17 +92,18 @@ TEST_F(PacerTest, NextWriteTime) {
std::chrono::microseconds rtt) { std::chrono::microseconds rtt) {
return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build(); return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build();
}); });
pacer.refreshPacingRate(20, 1000us); auto currentTime = Clock::now();
pacer.refreshPacingRate(20, 100ms);
// Right after refresh, it's always 0us. You can always send right after an // Right after refresh, it's always 0us. You can always send right after an
// ack. // ack.
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
pacer.updateAndGetWriteBatchSize(currentTime);
// Consume all the tokens: // Consume all the tokens:
consumeTokensHelper( consumeTokensHelper(pacer, 10);
pacer, 10 + conn.transportSettings.writeConnectionDataPacketsLimit);
// Then we use real delay: // Then we use real delay:
EXPECT_EQ(1000us, pacer.getTimeUntilNextWrite()); EXPECT_NEAR(100 * 1000, pacer.getTimeUntilNextWrite().count(), 1000);
} }
TEST_F(PacerTest, ImpossibleToPace) { TEST_F(PacerTest, ImpossibleToPace) {
@@ -138,17 +137,16 @@ TEST_F(PacerTest, CachedBatchSize) {
.setBurstSize(cwndBytes / conn.udpSendPacketLen * 2) .setBurstSize(cwndBytes / conn.udpSendPacketLen * 2)
.build(); .build();
}); });
auto currentTime = Clock::now();
pacer.refreshPacingRate(20 * conn.udpSendPacketLen, 100ms); pacer.refreshPacingRate(20 * conn.udpSendPacketLen, 100ms);
EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
auto currentTime = Clock::now();
pacer.onPacedWriteScheduled(currentTime);
pacer.updateAndGetWriteBatchSize(currentTime); pacer.updateAndGetWriteBatchSize(currentTime);
// lastWriteTime ~= currentTime, nextWriteTime_ ~= currentTime + 100ms
EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
pacer.onPacedWriteScheduled(currentTime + 100ms); EXPECT_EQ(80, pacer.updateAndGetWriteBatchSize(currentTime + 200ms));
pacer.updateAndGetWriteBatchSize(currentTime + 200ms); EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
EXPECT_EQ(80, pacer.getCachedWriteBatchSize());
} }
TEST_F(PacerTest, AppLimited) { TEST_F(PacerTest, AppLimited) {
@@ -158,48 +156,19 @@ TEST_F(PacerTest, AppLimited) {
EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now())); EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now()));
} }
TEST_F(PacerTest, Tokens) { TEST_F(PacerTest, PacingLimited) {
// Pacer has tokens right after init: pacer.setPacingRateCalculator([](const QuicConnectionStateBase& conn,
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); uint64_t cwndBytes,
EXPECT_EQ(
conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(Clock::now()));
// Consume all initial tokens:
consumeTokensHelper(
pacer, conn.transportSettings.writeConnectionDataPacketsLimit);
// Pacing rate: 10 mss per 10 ms
pacer.setPacingRateCalculator([](const QuicConnectionStateBase&,
uint64_t, uint64_t,
uint64_t, std::chrono::microseconds rtt) {
std::chrono::microseconds) { return PacingRate::Builder()
return PacingRate::Builder().setInterval(10ms).setBurstSize(10).build(); .setInterval(rtt)
.setBurstSize(cwndBytes / conn.udpSendPacketLen)
.build();
}); });
pacer.refreshPacingRate(2000 * conn.udpSendPacketLen, 1us);
// These input doesn't matter, the rate calculator above returns fixed values. pacer.onPacketSent(1);
pacer.refreshPacingRate(100, 100ms); EXPECT_TRUE(pacer.isPacingLimited());
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now()));
// Consume all tokens:
consumeTokensHelper(pacer, 10);
EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite());
EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now()));
// Do a schedule:
auto curTime = Clock::now();
pacer.onPacedWriteScheduled(curTime);
// 10ms later you should have 10 mss credit:
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms));
// Schedule again from this point:
pacer.onPacedWriteScheduled(curTime + 10ms);
// Then elapse another 10ms, and previous tokens hasn't been used:
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms));
} }
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -164,14 +164,6 @@ struct Pacer {
uint64_t cwndBytes, uint64_t cwndBytes,
std::chrono::microseconds rtt) = 0; std::chrono::microseconds rtt) = 0;
/**
* Notify the Pacer that a paced write is scheduled.
*
* currentTime: the time that the timer is scheduled. NOT the time that a
* write is scheduled to happen.
*/
virtual void onPacedWriteScheduled(TimePoint currentTime) = 0;
/** /**
* API for Trnasport to query the interval before next write * API for Trnasport to query the interval before next write
*/ */
@@ -193,7 +185,7 @@ struct Pacer {
virtual uint64_t getCachedWriteBatchSize() const = 0; virtual uint64_t getCachedWriteBatchSize() const = 0;
virtual void setAppLimited(bool limited) = 0; virtual void setAppLimited(bool limited) = 0;
virtual void onPacketSent() = 0; virtual void onPacketSent(uint64_t sentBytes) = 0;
virtual void onPacketsLoss() = 0; virtual void onPacketsLoss() = 0;
}; };
@@ -201,6 +193,7 @@ struct PacingRate {
std::chrono::microseconds interval{0us}; std::chrono::microseconds interval{0us};
uint64_t burstSize{0}; uint64_t burstSize{0};
PacingRate() = default;
struct Builder { struct Builder {
Builder&& setInterval(std::chrono::microseconds interval) &&; Builder&& setInterval(std::chrono::microseconds interval) &&;
Builder&& setBurstSize(uint64_t burstSize) &&; Builder&& setBurstSize(uint64_t burstSize) &&;

View File

@@ -34,12 +34,11 @@ class MockCongestionController : public CongestionController {
class MockPacer : public Pacer { class MockPacer : public Pacer {
public: public:
MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds)); MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds));
MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint));
MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds()); MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds());
MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint)); MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint));
MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t()); MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t());
MOCK_METHOD1(setAppLimited, void(bool)); MOCK_METHOD1(setAppLimited, void(bool));
MOCK_METHOD0(onPacketSent, void()); MOCK_METHOD1(onPacketSent, void(uint64_t));
MOCK_METHOD0(onPacketsLoss, void()); MOCK_METHOD0(onPacketsLoss, void());
}; };