1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Back out "Quic pacing refactor"

Summary: Original commit changeset: b83e4a01fc81

Reviewed By: mjoras

Differential Revision: D19644828

fbshipit-source-id: 83d5a3454c6f9a8364e970d236cba008aef85fbd
This commit is contained in:
Yang Chi
2020-01-30 18:30:45 -08:00
committed by Facebook Github Bot
parent 3b55c3d0fe
commit d5b454a9c0
11 changed files with 136 additions and 117 deletions

View File

@@ -49,6 +49,7 @@ QuicTransportBase::QuicTransportBase(
LooperType::WriteLooper)) { LooperType::WriteLooper)) {
writeLooper_->setPacingFunction([this]() -> auto { writeLooper_->setPacingFunction([this]() -> auto {
if (isConnectionPaced(*conn_)) { if (isConnectionPaced(*conn_)) {
conn_->pacer->onPacedWriteScheduled(Clock::now());
return conn_->pacer->getTimeUntilNextWrite(); return conn_->pacer->getTimeUntilNextWrite();
} }
return 0us; return 0us;

View File

@@ -402,7 +402,7 @@ void updateConnection(
} }
} }
if (conn.pacer) { if (conn.pacer) {
conn.pacer->onPacketSent(pkt.encodedSize); conn.pacer->onPacketSent();
} }
if (conn.pathValidationLimiter && if (conn.pathValidationLimiter &&
(conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) { (conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) {

View File

@@ -765,7 +765,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) {
ackFrame.ackBlocks.emplace_back(0, 10); ackFrame.ackBlocks.emplace_back(0, 10);
packet.packet.frames.push_back(std::move(ackFrame)); packet.packet.frames.push_back(std::move(ackFrame));
EXPECT_CALL(*rawController, onPacketSent(_)).Times(0); EXPECT_CALL(*rawController, onPacketSent(_)).Times(0);
EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(0); EXPECT_CALL(*rawPacer, onPacketSent()).Times(0);
updateConnection( updateConnection(
*conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet)); *conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet));
EXPECT_EQ(0, conn->outstandingPackets.size()); EXPECT_EQ(0, conn->outstandingPackets.size());
@@ -1020,7 +1020,7 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicdataToSocketWithPacer) {
IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012"); IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012");
writeDataToQuicStream(*stream1, buf->clone(), true); writeDataToQuicStream(*stream1, buf->clone(), true);
EXPECT_CALL(*rawPacer, onPacketSent(_)).Times(1); EXPECT_CALL(*rawPacer, onPacketSent()).Times(1);
EXPECT_CALL(*transportInfoCb_, onWrite(_)); EXPECT_CALL(*transportInfoCb_, onWrite(_));
writeQuicDataToSocket( writeQuicDataToSocket(
*rawSocket, *rawSocket,

View File

@@ -2511,6 +2511,7 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) {
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1)); .WillRepeatedly(Return(1));
EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_));
EXPECT_CALL(*rawPacer, getTimeUntilNextWrite()) EXPECT_CALL(*rawPacer, getTimeUntilNextWrite())
.WillRepeatedly(Return(3600000ms)); .WillRepeatedly(Return(3600000ms));
// This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will // This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will

View File

@@ -29,21 +29,29 @@ PacingRate calculatePacingRate(
uint64_t cwnd, uint64_t cwnd,
uint64_t minCwndInMss, uint64_t minCwndInMss,
std::chrono::microseconds rtt) { std::chrono::microseconds rtt) {
if (conn.transportSettings.pacingTimerTickInterval >= rtt) { if (conn.transportSettings.pacingTimerTickInterval > rtt) {
// We cannot really pace in this case. // We cannot really pace in this case.
return PacingRate::Builder() return PacingRate::Builder()
.setInterval(0us) .setInterval(0us)
.setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit) .setBurstSize(conn.transportSettings.writeConnectionDataPacketsLimit)
.build(); .build();
} }
uint64_t numIntervals = rtt / conn.transportSettings.pacingTimerTickInterval;
uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen); uint64_t cwndInPackets = std::max(minCwndInMss, cwnd / conn.udpSendPacketLen);
// Each interval we want to send cwndInpackets / (rtt / minimalInverval)
// number of packets.
uint64_t burstPerInterval = std::max( uint64_t burstPerInterval = std::max(
conn.transportSettings.minBurstPackets, cwndInPackets / numIntervals); conn.transportSettings.minBurstPackets,
static_cast<uint64_t>(std::ceil(
static_cast<double>(cwndInPackets) *
static_cast<double>(
conn.transportSettings.pacingTimerTickInterval.count()) /
static_cast<double>(rtt.count()))));
auto interval = timeMax(
conn.transportSettings.pacingTimerTickInterval,
rtt * burstPerInterval / cwndInPackets);
return PacingRate::Builder() return PacingRate::Builder()
.setInterval(conn.transportSettings.pacingTimerTickInterval) .setInterval(interval)
.setBurstSize(burstPerInterval) .setBurstSize(burstPerInterval)
.build(); .build();
} }
} // namespace quic } // namespace quic

View File

@@ -8,7 +8,6 @@
#include <quic/congestion_control/Pacer.h> #include <quic/congestion_control/Pacer.h>
#include <quic/common/TimeUtil.h>
#include <quic/congestion_control/CongestionControlFunctions.h> #include <quic/congestion_control/CongestionControlFunctions.h>
#include <quic/logging/QuicLogger.h> #include <quic/logging/QuicLogger.h>
@@ -22,9 +21,7 @@ DefaultPacer::DefaultPacer(
batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
pacingRateCalculator_(calculatePacingRate), pacingRateCalculator_(calculatePacingRate),
cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit), cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
tokens_(conn.transportSettings.writeConnectionDataPacketsLimit), tokens_(conn.transportSettings.writeConnectionDataPacketsLimit) {}
nextWriteTime_(Clock::now()),
lastWriteTime_(Clock::now()) {}
// TODO: we choose to keep refershing pacing rate even when we are app-limited, // TODO: we choose to keep refershing pacing rate even when we are app-limited,
// so that when we exit app-limited, we have an updated pacing rate. But I don't // so that when we exit app-limited, we have an updated pacing rate. But I don't
@@ -32,17 +29,15 @@ DefaultPacer::DefaultPacer(
void DefaultPacer::refreshPacingRate( void DefaultPacer::refreshPacingRate(
uint64_t cwndBytes, uint64_t cwndBytes,
std::chrono::microseconds rtt) { std::chrono::microseconds rtt) {
auto currentTime = Clock::now();
if (rtt < conn_.transportSettings.pacingTimerTickInterval) { if (rtt < conn_.transportSettings.pacingTimerTickInterval) {
writeInterval_ = 0us; writeInterval_ = 0us;
batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit; batchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit;
} else { } else {
const auto pacingRate = const PacingRate pacingRate =
pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt); pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt);
writeInterval_ = pacingRate.interval; writeInterval_ = pacingRate.interval;
batchSize_ = pacingRate.burstSize; batchSize_ = pacingRate.burstSize;
lastPacingRateUpdate_ = currentTime; tokens_ += batchSize_;
bytesSentSincePacingRateUpdate_ = 0;
} }
if (conn_.qLogger) { if (conn_.qLogger) {
conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_); conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_);
@@ -50,40 +45,16 @@ void DefaultPacer::refreshPacingRate(
QUIC_TRACE( QUIC_TRACE(
pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_); pacing_update, conn_, writeInterval_.count(), (uint64_t)batchSize_);
cachedBatchSize_ = batchSize_; cachedBatchSize_ = batchSize_;
tokens_ = batchSize_;
nextWriteTime_ = currentTime;
if (firstUpdate_) {
firstUpdate_ = false;
lastWriteTime_ = currentTime;
}
} }
void DefaultPacer::onPacketSent(uint64_t bytesSent) { void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) {
scheduledWriteTime_ = currentTime;
}
void DefaultPacer::onPacketSent() {
if (tokens_) { if (tokens_) {
--tokens_; --tokens_;
} }
bytesSentSincePacingRateUpdate_ += bytesSent;
if (writeInterval_ != 0us && cachedBatchSize_ && !appLimited_ &&
lastPacingRateUpdate_) {
Bandwidth expectedBandwidth(
cachedBatchSize_ * conn_.udpSendPacketLen, writeInterval_);
if (expectedBandwidth) {
Bandwidth actualPacingBandwidth(
bytesSentSincePacingRateUpdate_,
std::chrono::duration_cast<std::chrono::microseconds>(
Clock::now() - *lastPacingRateUpdate_));
pacingLimited_ = actualPacingBandwidth < expectedBandwidth;
}
} else {
pacingLimited_ = false;
}
if (!pacingLimited_) {
lastWriteTime_ = Clock::now();
}
}
bool DefaultPacer::isPacingLimited() const noexcept {
return pacingLimited_;
} }
void DefaultPacer::onPacketsLoss() { void DefaultPacer::onPacketsLoss() {
@@ -91,30 +62,34 @@ void DefaultPacer::onPacketsLoss() {
} }
std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const { std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const {
return (writeInterval_ == 0us || appLimited_ || tokens_ || return (appLimited_ || tokens_) ? 0us : writeInterval_;
Clock::now() + conn_.transportSettings.pacingTimerTickInterval >=
nextWriteTime_)
? 0us
: timeMax(
conn_.transportSettings.pacingTimerTickInterval,
timeMin(
writeInterval_,
std::chrono::duration_cast<std::chrono::microseconds>(
nextWriteTime_ - Clock::now())));
} }
uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) { uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) {
SCOPE_EXIT { SCOPE_EXIT {
lastWriteTime_ = nextWriteTime_; scheduledWriteTime_.clear();
nextWriteTime_ += writeInterval_;
}; };
if (appLimited_ || writeInterval_ == 0us) { if (appLimited_) {
return conn_.transportSettings.writeConnectionDataPacketsLimit; cachedBatchSize_ = conn_.transportSettings.writeConnectionDataPacketsLimit;
return cachedBatchSize_;
}
if (writeInterval_ == 0us) {
return batchSize_;
}
if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) {
return tokens_;
} }
auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>( auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>(
timeMax(currentTime - lastWriteTime_, writeInterval_)); currentTime - *scheduledWriteTime_ + writeInterval_);
return std::ceil( cachedBatchSize_ = std::ceil(
adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count()); adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count());
if (cachedBatchSize_ < batchSize_) {
LOG(ERROR)
<< "Quic pacer batch size calculation: cachedBatchSize < batchSize";
}
tokens_ +=
(cachedBatchSize_ > batchSize_ ? cachedBatchSize_ - batchSize_ : 0);
return tokens_;
} }
uint64_t DefaultPacer::getCachedWriteBatchSize() const { uint64_t DefaultPacer::getCachedWriteBatchSize() const {

View File

@@ -8,7 +8,6 @@
#pragma once #pragma once
#include <quic/congestion_control/Bandwidth.h>
#include <quic/state/StateData.h> #include <quic/state/StateData.h>
namespace quic { namespace quic {
@@ -32,6 +31,8 @@ class DefaultPacer : public Pacer {
void refreshPacingRate(uint64_t cwndBytes, std::chrono::microseconds rtt) void refreshPacingRate(uint64_t cwndBytes, std::chrono::microseconds rtt)
override; override;
void onPacedWriteScheduled(TimePoint currentTime) override;
std::chrono::microseconds getTimeUntilNextWrite() const override; std::chrono::microseconds getTimeUntilNextWrite() const override;
uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override; uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override;
@@ -42,12 +43,9 @@ class DefaultPacer : public Pacer {
void setAppLimited(bool limited) override; void setAppLimited(bool limited) override;
void onPacketSent(uint64_t bytesSent) override; void onPacketSent() override;
void onPacketsLoss() override; void onPacketsLoss() override;
// Only used for test:
bool isPacingLimited() const noexcept;
private: private:
const QuicConnectionStateBase& conn_; const QuicConnectionStateBase& conn_;
uint64_t minCwndInMss_; uint64_t minCwndInMss_;
@@ -58,11 +56,5 @@ class DefaultPacer : public Pacer {
uint64_t cachedBatchSize_; uint64_t cachedBatchSize_;
bool appLimited_{false}; bool appLimited_{false};
uint64_t tokens_; uint64_t tokens_;
uint64_t bytesSentSincePacingRateUpdate_{0};
folly::Optional<TimePoint> lastPacingRateUpdate_;
bool pacingLimited_{false};
TimePoint nextWriteTime_;
TimePoint lastWriteTime_;
bool firstUpdate_{true};
}; };
} // namespace quic } // namespace quic

View File

@@ -43,7 +43,9 @@ TEST_F(CongestionControlFunctionsTest, MinPacingRate) {
conn.transportSettings.pacingTimerTickInterval = 1ms; conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result = calculatePacingRate( auto result = calculatePacingRate(
conn, 100, conn.transportSettings.minCwndInMss, 100ms); conn, 100, conn.transportSettings.minCwndInMss, 100ms);
EXPECT_EQ(1ms, result.interval); // 100 ms rtt, 1ms tick interval, 100 mss cwnd, 5 mss min burst -> 5 mss every
// 5ms
EXPECT_EQ(5ms, result.interval);
EXPECT_EQ(conn.transportSettings.minBurstPackets, result.burstSize); EXPECT_EQ(conn.transportSettings.minBurstPackets, result.burstSize);
} }
@@ -54,7 +56,7 @@ TEST_F(CongestionControlFunctionsTest, SmallCwnd) {
conn.transportSettings.pacingTimerTickInterval = 1ms; conn.transportSettings.pacingTimerTickInterval = 1ms;
auto result = calculatePacingRate( auto result = calculatePacingRate(
conn, 10, conn.transportSettings.minCwndInMss, 100000us); conn, 10, conn.transportSettings.minCwndInMss, 100000us);
EXPECT_EQ(1ms, result.interval); EXPECT_EQ(10ms, result.interval);
EXPECT_EQ(1, result.burstSize); EXPECT_EQ(1, result.burstSize);
} }
@@ -70,5 +72,6 @@ TEST_F(CongestionControlFunctionsTest, RttSmallerThanInterval) {
conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize); conn.transportSettings.writeConnectionDataPacketsLimit, result.burstSize);
} }
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -17,7 +17,7 @@ namespace test {
namespace { namespace {
void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) { void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) {
for (size_t i = 0; i < tokensToConsume; i++) { for (size_t i = 0; i < tokensToConsume; i++) {
pacer.onPacketSent(1000); pacer.onPacketSent();
} }
} }
} // namespace } // namespace
@@ -45,19 +45,16 @@ TEST_F(PacerTest, RateCalculator) {
uint64_t, uint64_t,
uint64_t, uint64_t,
std::chrono::microseconds) { std::chrono::microseconds) {
return PacingRate::Builder().setInterval(500ms).setBurstSize(4321).build(); return PacingRate::Builder().setInterval(1234us).setBurstSize(4321).build();
}); });
auto currentTime = Clock::now();
pacer.refreshPacingRate(200000, 200us); pacer.refreshPacingRate(200000, 200us);
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(currentTime)); EXPECT_EQ(
consumeTokensHelper(pacer, 4321); 4321 + conn.transportSettings.writeConnectionDataPacketsLimit,
EXPECT_NEAR( pacer.updateAndGetWriteBatchSize(Clock::now()));
std::chrono::duration_cast<std::chrono::microseconds>( consumeTokensHelper(
500ms + currentTime - Clock::now()) pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit);
.count(), EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite());
pacer.getTimeUntilNextWrite().count(),
2000);
} }
TEST_F(PacerTest, CompensateTimerDrift) { TEST_F(PacerTest, CompensateTimerDrift) {
@@ -69,18 +66,23 @@ TEST_F(PacerTest, CompensateTimerDrift) {
}); });
auto currentTime = Clock::now(); auto currentTime = Clock::now();
pacer.refreshPacingRate(20, 100us); // These two values do not matter here pacer.refreshPacingRate(20, 100us); // These two values do not matter here
// After refresh, both last and next write time is very close to currentTime pacer.onPacedWriteScheduled(currentTime);
EXPECT_NEAR(10, pacer.updateAndGetWriteBatchSize(currentTime + 1000us), 2); EXPECT_EQ(
// lastWriteTime ~= currentTime, nextWriteTime ~= currentTime + 1000us 20 + conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(currentTime + 1000us));
EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2); // Query batch size again without calling onPacedWriteScheduled won't do timer
// lastWriteTime ~= currentTime + 1000us, nextWriteTime ~= currentTime + // drift compensation. But token_ keeps the last compenstation.
// 2000us EXPECT_EQ(
20 + conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
// Consume a few: // Consume a few:
consumeTokensHelper(pacer, 3); consumeTokensHelper(pacer, 3);
EXPECT_NEAR(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us), 2); EXPECT_EQ(
20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3,
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
} }
TEST_F(PacerTest, NextWriteTime) { TEST_F(PacerTest, NextWriteTime) {
@@ -92,18 +94,17 @@ TEST_F(PacerTest, NextWriteTime) {
std::chrono::microseconds rtt) { std::chrono::microseconds rtt) {
return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build(); return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build();
}); });
auto currentTime = Clock::now(); pacer.refreshPacingRate(20, 1000us);
pacer.refreshPacingRate(20, 100ms);
// Right after refresh, it's always 0us. You can always send right after an // Right after refresh, it's always 0us. You can always send right after an
// ack. // ack.
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite()); EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
pacer.updateAndGetWriteBatchSize(currentTime);
// Consume all the tokens: // Consume all the tokens:
consumeTokensHelper(pacer, 10); consumeTokensHelper(
pacer, 10 + conn.transportSettings.writeConnectionDataPacketsLimit);
// Then we use real delay: // Then we use real delay:
EXPECT_NEAR(100 * 1000, pacer.getTimeUntilNextWrite().count(), 1000); EXPECT_EQ(1000us, pacer.getTimeUntilNextWrite());
} }
TEST_F(PacerTest, ImpossibleToPace) { TEST_F(PacerTest, ImpossibleToPace) {
@@ -137,16 +138,17 @@ TEST_F(PacerTest, CachedBatchSize) {
.setBurstSize(cwndBytes / conn.udpSendPacketLen * 2) .setBurstSize(cwndBytes / conn.udpSendPacketLen * 2)
.build(); .build();
}); });
auto currentTime = Clock::now();
pacer.refreshPacingRate(20 * conn.udpSendPacketLen, 100ms); pacer.refreshPacingRate(20 * conn.udpSendPacketLen, 100ms);
EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
auto currentTime = Clock::now();
pacer.onPacedWriteScheduled(currentTime);
pacer.updateAndGetWriteBatchSize(currentTime); pacer.updateAndGetWriteBatchSize(currentTime);
// lastWriteTime ~= currentTime, nextWriteTime_ ~= currentTime + 100ms
EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
EXPECT_EQ(80, pacer.updateAndGetWriteBatchSize(currentTime + 200ms)); pacer.onPacedWriteScheduled(currentTime + 100ms);
EXPECT_EQ(40, pacer.getCachedWriteBatchSize()); pacer.updateAndGetWriteBatchSize(currentTime + 200ms);
EXPECT_EQ(80, pacer.getCachedWriteBatchSize());
} }
TEST_F(PacerTest, AppLimited) { TEST_F(PacerTest, AppLimited) {
@@ -156,19 +158,48 @@ TEST_F(PacerTest, AppLimited) {
EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now())); EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now()));
} }
TEST_F(PacerTest, PacingLimited) { TEST_F(PacerTest, Tokens) {
pacer.setPacingRateCalculator([](const QuicConnectionStateBase& conn, // Pacer has tokens right after init:
uint64_t cwndBytes, EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(
conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(Clock::now()));
// Consume all initial tokens:
consumeTokensHelper(
pacer, conn.transportSettings.writeConnectionDataPacketsLimit);
// Pacing rate: 10 mss per 10 ms
pacer.setPacingRateCalculator([](const QuicConnectionStateBase&,
uint64_t, uint64_t,
std::chrono::microseconds rtt) { uint64_t,
return PacingRate::Builder() std::chrono::microseconds) {
.setInterval(rtt) return PacingRate::Builder().setInterval(10ms).setBurstSize(10).build();
.setBurstSize(cwndBytes / conn.udpSendPacketLen)
.build();
}); });
pacer.refreshPacingRate(2000 * conn.udpSendPacketLen, 1us);
pacer.onPacketSent(1); // These input doesn't matter, the rate calculator above returns fixed values.
EXPECT_TRUE(pacer.isPacingLimited()); pacer.refreshPacingRate(100, 100ms);
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now()));
// Consume all tokens:
consumeTokensHelper(pacer, 10);
EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite());
EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now()));
// Do a schedule:
auto curTime = Clock::now();
pacer.onPacedWriteScheduled(curTime);
// 10ms later you should have 10 mss credit:
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms));
// Schedule again from this point:
pacer.onPacedWriteScheduled(curTime + 10ms);
// Then elapse another 10ms, and previous tokens hasn't been used:
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms));
} }
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -179,6 +179,14 @@ struct Pacer {
uint64_t cwndBytes, uint64_t cwndBytes,
std::chrono::microseconds rtt) = 0; std::chrono::microseconds rtt) = 0;
/**
* Notify the Pacer that a paced write is scheduled.
*
* currentTime: the time that the timer is scheduled. NOT the time that a
* write is scheduled to happen.
*/
virtual void onPacedWriteScheduled(TimePoint currentTime) = 0;
/** /**
* API for Trnasport to query the interval before next write * API for Trnasport to query the interval before next write
*/ */
@@ -200,7 +208,7 @@ struct Pacer {
virtual uint64_t getCachedWriteBatchSize() const = 0; virtual uint64_t getCachedWriteBatchSize() const = 0;
virtual void setAppLimited(bool limited) = 0; virtual void setAppLimited(bool limited) = 0;
virtual void onPacketSent(uint64_t sentBytes) = 0; virtual void onPacketSent() = 0;
virtual void onPacketsLoss() = 0; virtual void onPacketsLoss() = 0;
}; };
@@ -208,7 +216,6 @@ struct PacingRate {
std::chrono::microseconds interval{0us}; std::chrono::microseconds interval{0us};
uint64_t burstSize{0}; uint64_t burstSize{0};
PacingRate() = default;
struct Builder { struct Builder {
Builder&& setInterval(std::chrono::microseconds interval) &&; Builder&& setInterval(std::chrono::microseconds interval) &&;
Builder&& setBurstSize(uint64_t burstSize) &&; Builder&& setBurstSize(uint64_t burstSize) &&;

View File

@@ -34,11 +34,12 @@ class MockCongestionController : public CongestionController {
class MockPacer : public Pacer { class MockPacer : public Pacer {
public: public:
MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds)); MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds));
MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint));
MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds()); MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds());
MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint)); MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint));
MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t()); MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t());
MOCK_METHOD1(setAppLimited, void(bool)); MOCK_METHOD1(setAppLimited, void(bool));
MOCK_METHOD1(onPacketSent, void(uint64_t)); MOCK_METHOD0(onPacketSent, void());
MOCK_METHOD0(onPacketsLoss, void()); MOCK_METHOD0(onPacketsLoss, void());
}; };