mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
Experiment with longer timer compensation in Quic Pacer
Summary: Our current timer compenstation works as following: Pacer schedule write -> Pacer marks scheduled time T0-> timer fires -> Pacer uses (now - T0 + writeInterval) to calculate burst size -> transport writes burst size amount of data at time T1 -> pacer schedules again. This diff changes to: Pacer scheduleWrite -> timer fires -> Pacer uses (now - previous T1' + writeInteral) to calculate burst size -> transport writes burst size amount of data at T1 -> pacer schedules again because T1' < T0 < T1, this compensates the timer more. With higher compensation from timer interval calculation, the `tokens_ += batchSize_;` code inside refreshPacingRate is removed in this diff. Reviewed By: yangchi Differential Revision: D22532672 fbshipit-source-id: 6547298e933965ab412d944cfd65d5c60f4dced7
This commit is contained in:
committed by
Facebook GitHub Bot
parent
018ff668ad
commit
8b007886df
@@ -49,7 +49,6 @@ QuicTransportBase::QuicTransportBase(
|
|||||||
LooperType::WriteLooper)) {
|
LooperType::WriteLooper)) {
|
||||||
writeLooper_->setPacingFunction([this]() -> auto {
|
writeLooper_->setPacingFunction([this]() -> auto {
|
||||||
if (isConnectionPaced(*conn_)) {
|
if (isConnectionPaced(*conn_)) {
|
||||||
conn_->pacer->onPacedWriteScheduled(Clock::now());
|
|
||||||
return conn_->pacer->getTimeUntilNextWrite();
|
return conn_->pacer->getTimeUntilNextWrite();
|
||||||
}
|
}
|
||||||
return 0us;
|
return 0us;
|
||||||
|
@@ -2943,7 +2943,6 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) {
|
|||||||
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
|
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
|
||||||
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
|
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
|
||||||
.WillRepeatedly(Return(1));
|
.WillRepeatedly(Return(1));
|
||||||
EXPECT_CALL(*rawPacer, onPacedWriteScheduled(_));
|
|
||||||
EXPECT_CALL(*rawPacer, getTimeUntilNextWrite())
|
EXPECT_CALL(*rawPacer, getTimeUntilNextWrite())
|
||||||
.WillRepeatedly(Return(3600000ms));
|
.WillRepeatedly(Return(3600000ms));
|
||||||
// This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will
|
// This will write out 100 bytes, leave 100 bytes behind. FunctionLooper will
|
||||||
|
@@ -37,7 +37,7 @@ void DefaultPacer::refreshPacingRate(
|
|||||||
pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt);
|
pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt);
|
||||||
writeInterval_ = pacingRate.interval;
|
writeInterval_ = pacingRate.interval;
|
||||||
batchSize_ = pacingRate.burstSize;
|
batchSize_ = pacingRate.burstSize;
|
||||||
tokens_ += batchSize_;
|
tokens_ = batchSize_;
|
||||||
}
|
}
|
||||||
if (conn_.qLogger) {
|
if (conn_.qLogger) {
|
||||||
conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_);
|
conn_.qLogger->addPacingMetricUpdate(batchSize_, writeInterval_);
|
||||||
@@ -66,10 +66,6 @@ void DefaultPacer::setPacingRate(
|
|||||||
conn.transportSettings.pacingTimerTickInterval);
|
conn.transportSettings.pacingTimerTickInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) {
|
|
||||||
scheduledWriteTime_ = currentTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
void DefaultPacer::onPacketSent() {
|
void DefaultPacer::onPacketSent() {
|
||||||
if (tokens_) {
|
if (tokens_) {
|
||||||
--tokens_;
|
--tokens_;
|
||||||
@@ -78,6 +74,7 @@ void DefaultPacer::onPacketSent() {
|
|||||||
|
|
||||||
void DefaultPacer::onPacketsLoss() {
|
void DefaultPacer::onPacketsLoss() {
|
||||||
tokens_ = 0UL;
|
tokens_ = 0UL;
|
||||||
|
lastWriteTime_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const {
|
std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const {
|
||||||
@@ -86,16 +83,21 @@ std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const {
|
|||||||
|
|
||||||
uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) {
|
uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) {
|
||||||
SCOPE_EXIT {
|
SCOPE_EXIT {
|
||||||
scheduledWriteTime_.reset();
|
lastWriteTime_ = currentTime;
|
||||||
};
|
};
|
||||||
if (writeInterval_ == 0us) {
|
if (writeInterval_ == 0us) {
|
||||||
return batchSize_;
|
return batchSize_;
|
||||||
}
|
}
|
||||||
if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) {
|
if (!lastWriteTime_) {
|
||||||
return tokens_;
|
return tokens_;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Don't let `+ writeInterval_` confuse you. A few lines later we use
|
||||||
|
* `cachedBatchSize_ - batchSize_` instead of `cachedBatchSize_` to increase
|
||||||
|
* token_.
|
||||||
|
*/
|
||||||
auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>(
|
auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
currentTime - *scheduledWriteTime_ + writeInterval_);
|
currentTime - *lastWriteTime_ + writeInterval_);
|
||||||
cachedBatchSize_ = std::ceil(
|
cachedBatchSize_ = std::ceil(
|
||||||
adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count());
|
adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count());
|
||||||
if (cachedBatchSize_ < batchSize_) {
|
if (cachedBatchSize_ < batchSize_) {
|
||||||
|
@@ -34,8 +34,6 @@ class DefaultPacer : public Pacer {
|
|||||||
// rate_bps is *bytes* per second
|
// rate_bps is *bytes* per second
|
||||||
void setPacingRate(QuicConnectionStateBase& conn, uint64_t rate_bps) override;
|
void setPacingRate(QuicConnectionStateBase& conn, uint64_t rate_bps) override;
|
||||||
|
|
||||||
void onPacedWriteScheduled(TimePoint currentTime) override;
|
|
||||||
|
|
||||||
std::chrono::microseconds getTimeUntilNextWrite() const override;
|
std::chrono::microseconds getTimeUntilNextWrite() const override;
|
||||||
|
|
||||||
uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override;
|
uint64_t updateAndGetWriteBatchSize(TimePoint currentTime) override;
|
||||||
@@ -52,9 +50,9 @@ class DefaultPacer : public Pacer {
|
|||||||
uint64_t minCwndInMss_;
|
uint64_t minCwndInMss_;
|
||||||
uint64_t batchSize_;
|
uint64_t batchSize_;
|
||||||
std::chrono::microseconds writeInterval_{0};
|
std::chrono::microseconds writeInterval_{0};
|
||||||
folly::Optional<TimePoint> scheduledWriteTime_;
|
|
||||||
PacingRateCalculator pacingRateCalculator_;
|
PacingRateCalculator pacingRateCalculator_;
|
||||||
uint64_t cachedBatchSize_;
|
uint64_t cachedBatchSize_;
|
||||||
uint64_t tokens_;
|
uint64_t tokens_;
|
||||||
|
folly::Optional<TimePoint> lastWriteTime_;
|
||||||
};
|
};
|
||||||
} // namespace quic
|
} // namespace quic
|
||||||
|
@@ -49,11 +49,8 @@ TEST_F(PacerTest, RateCalculator) {
|
|||||||
});
|
});
|
||||||
pacer.refreshPacingRate(200000, 200us);
|
pacer.refreshPacingRate(200000, 200us);
|
||||||
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
|
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
|
||||||
EXPECT_EQ(
|
EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(Clock::now()));
|
||||||
4321 + conn.transportSettings.writeConnectionDataPacketsLimit,
|
consumeTokensHelper(pacer, 4321);
|
||||||
pacer.updateAndGetWriteBatchSize(Clock::now()));
|
|
||||||
consumeTokensHelper(
|
|
||||||
pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit);
|
|
||||||
EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite());
|
EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,23 +63,8 @@ TEST_F(PacerTest, CompensateTimerDrift) {
|
|||||||
});
|
});
|
||||||
auto currentTime = Clock::now();
|
auto currentTime = Clock::now();
|
||||||
pacer.refreshPacingRate(20, 100us); // These two values do not matter here
|
pacer.refreshPacingRate(20, 100us); // These two values do not matter here
|
||||||
pacer.onPacedWriteScheduled(currentTime);
|
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(currentTime + 1000us));
|
||||||
EXPECT_EQ(
|
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
|
||||||
20 + conn.transportSettings.writeConnectionDataPacketsLimit,
|
|
||||||
pacer.updateAndGetWriteBatchSize(currentTime + 1000us));
|
|
||||||
|
|
||||||
// Query batch size again without calling onPacedWriteScheduled won't do timer
|
|
||||||
// drift compensation. But token_ keeps the last compenstation.
|
|
||||||
EXPECT_EQ(
|
|
||||||
20 + conn.transportSettings.writeConnectionDataPacketsLimit,
|
|
||||||
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
|
|
||||||
|
|
||||||
// Consume a few:
|
|
||||||
consumeTokensHelper(pacer, 3);
|
|
||||||
|
|
||||||
EXPECT_EQ(
|
|
||||||
20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3,
|
|
||||||
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PacerTest, NextWriteTime) {
|
TEST_F(PacerTest, NextWriteTime) {
|
||||||
@@ -142,21 +124,20 @@ TEST_F(PacerTest, CachedBatchSize) {
|
|||||||
EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
|
EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
|
||||||
|
|
||||||
auto currentTime = Clock::now();
|
auto currentTime = Clock::now();
|
||||||
pacer.onPacedWriteScheduled(currentTime);
|
|
||||||
pacer.updateAndGetWriteBatchSize(currentTime);
|
pacer.updateAndGetWriteBatchSize(currentTime);
|
||||||
EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
|
EXPECT_EQ(40, pacer.getCachedWriteBatchSize());
|
||||||
|
|
||||||
pacer.onPacedWriteScheduled(currentTime + 100ms);
|
|
||||||
pacer.updateAndGetWriteBatchSize(currentTime + 200ms);
|
pacer.updateAndGetWriteBatchSize(currentTime + 200ms);
|
||||||
EXPECT_EQ(80, pacer.getCachedWriteBatchSize());
|
EXPECT_EQ(120, pacer.getCachedWriteBatchSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PacerTest, Tokens) {
|
TEST_F(PacerTest, Tokens) {
|
||||||
// Pacer has tokens right after init:
|
// Pacer has tokens right after init:
|
||||||
|
auto currentTime = Clock::now();
|
||||||
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
|
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
|
||||||
EXPECT_EQ(
|
EXPECT_EQ(
|
||||||
conn.transportSettings.writeConnectionDataPacketsLimit,
|
conn.transportSettings.writeConnectionDataPacketsLimit,
|
||||||
pacer.updateAndGetWriteBatchSize(Clock::now()));
|
pacer.updateAndGetWriteBatchSize(currentTime));
|
||||||
|
|
||||||
// Consume all initial tokens:
|
// Consume all initial tokens:
|
||||||
consumeTokensHelper(
|
consumeTokensHelper(
|
||||||
@@ -174,24 +155,20 @@ TEST_F(PacerTest, Tokens) {
|
|||||||
pacer.refreshPacingRate(100, 100ms);
|
pacer.refreshPacingRate(100, 100ms);
|
||||||
|
|
||||||
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
|
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
|
||||||
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now()));
|
EXPECT_EQ(10 + 10, pacer.updateAndGetWriteBatchSize(currentTime + 10ms));
|
||||||
|
|
||||||
// Consume all tokens:
|
// Consume all tokens:
|
||||||
consumeTokensHelper(pacer, 10);
|
consumeTokensHelper(pacer, 20);
|
||||||
|
|
||||||
EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite());
|
EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite());
|
||||||
EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now()));
|
EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(currentTime + 10ms));
|
||||||
|
|
||||||
// Do a schedule:
|
|
||||||
auto curTime = Clock::now();
|
|
||||||
pacer.onPacedWriteScheduled(curTime);
|
|
||||||
// 10ms later you should have 10 mss credit:
|
// 10ms later you should have 10 mss credit:
|
||||||
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms));
|
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(currentTime + 20ms));
|
||||||
|
|
||||||
// Schedule again from this point:
|
// Schedule again from this point:
|
||||||
pacer.onPacedWriteScheduled(curTime + 10ms);
|
|
||||||
// Then elapse another 10ms, and previous tokens hasn't been used:
|
// Then elapse another 10ms, and previous tokens hasn't been used:
|
||||||
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms));
|
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(currentTime + 30ms));
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace test
|
} // namespace test
|
||||||
|
@@ -200,14 +200,6 @@ struct Pacer {
|
|||||||
QuicConnectionStateBase& conn,
|
QuicConnectionStateBase& conn,
|
||||||
uint64_t rate_bps) = 0;
|
uint64_t rate_bps) = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* Notify the Pacer that a paced write is scheduled.
|
|
||||||
*
|
|
||||||
* currentTime: the time that the timer is scheduled. NOT the time that a
|
|
||||||
* write is scheduled to happen.
|
|
||||||
*/
|
|
||||||
virtual void onPacedWriteScheduled(TimePoint currentTime) = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* API for Trnasport to query the interval before next write
|
* API for Trnasport to query the interval before next write
|
||||||
*/
|
*/
|
||||||
|
@@ -35,7 +35,6 @@ class MockPacer : public Pacer {
|
|||||||
public:
|
public:
|
||||||
MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds));
|
MOCK_METHOD2(refreshPacingRate, void(uint64_t, std::chrono::microseconds));
|
||||||
MOCK_METHOD2(setPacingRate, void(QuicConnectionStateBase&, uint64_t));
|
MOCK_METHOD2(setPacingRate, void(QuicConnectionStateBase&, uint64_t));
|
||||||
MOCK_METHOD1(onPacedWriteScheduled, void(TimePoint));
|
|
||||||
MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds());
|
MOCK_CONST_METHOD0(getTimeUntilNextWrite, std::chrono::microseconds());
|
||||||
MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint));
|
MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint));
|
||||||
MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t());
|
MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t());
|
||||||
|
Reference in New Issue
Block a user