1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

Introduce Tokens in Quic Pacer

Summary:
Add a token value into the pacer. This is so that when there is not
enough application data to consume all the burst size in current event loop, we
can accumulate the unused sending credit and use the later when new data comes
in. Each time a packet is sent, we consume 1 token. On pakcet loss, we clear
all tokens. Each time there is an ack and we refresh pacing rate, token
increases by calculated burst size. It is also increased when timer drifts
during writes. When there is available tokens, there is no delay of writing out
packets, and the burst size is current token amount.

Reviewed By: siyengar

Differential Revision: D17670053

fbshipit-source-id: 6abc3acce39e0ece90248c52c3d73935a9878e02
This commit is contained in:
Yang Chi
2019-10-10 22:06:02 -07:00
committed by Facebook Github Bot
parent 5389228722
commit 537e178a5f
14 changed files with 202 additions and 9 deletions

View File

@@ -396,6 +396,9 @@ void updateConnection(
conn.congestionController->getCongestionWindow());
}
}
if (conn.pacer && !pureAck) {
conn.pacer->onPacketSent();
}
if (pkt.isHandshake) {
++conn.outstandingHandshakePacketsCount;
conn.lossState.lastHandshakePacketSentTime = pkt.time;

View File

@@ -743,6 +743,9 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) {
auto conn = createConn();
conn->qLogger = std::make_shared<quic::FileQLogger>();
auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake);
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn->pacer = std::move(mockPacer);
auto mockCongestionController = std::make_unique<MockCongestionController>();
auto rawController = mockCongestionController.get();
conn->congestionController = std::move(mockCongestionController);
@@ -751,6 +754,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionWithPureAck) {
ackFrame.ackBlocks.insert(10);
packet.packet.frames.push_back(std::move(ackFrame));
EXPECT_CALL(*rawController, onPacketSent(_)).Times(0);
EXPECT_CALL(*rawPacer, onPacketSent()).Times(0);
updateConnection(
*conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet));
EXPECT_EQ(1, conn->outstandingPackets.size());
@@ -986,6 +990,34 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketWithCC) {
conn->transportSettings.writeConnectionDataPacketsLimit);
}
TEST_F(QuicTransportFunctionsTest, WriteQuicdataToSocketWithPacer) {
auto conn = createConn();
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn->pacer = std::move(mockPacer);
EventBase evb;
auto socket = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
auto rawSocket = socket.get();
auto stream1 = conn->streamManager->createNextBidirectionalStream().value();
auto buf =
IOBuf::copyBuffer("0123456789012012345678901201234567890120123456789012");
writeDataToQuicStream(*stream1, buf->clone(), true);
EXPECT_CALL(*rawPacer, onPacketSent()).Times(1);
EXPECT_CALL(*transportInfoCb_, onWrite(_));
writeQuicDataToSocket(
*rawSocket,
*conn,
*conn->clientConnectionId,
*conn->serverConnectionId,
*aead,
*headerCipher,
getVersion(*conn),
conn->transportSettings.writeConnectionDataPacketsLimit);
}
TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketLimitTest) {
auto conn = createConn();
auto mockCongestionController = std::make_unique<MockCongestionController>();

View File

@@ -158,6 +158,9 @@ void BbrCongestionController::onPacketAckOrLoss(
}
if (lossEvent) {
onPacketLoss(*lossEvent);
if (conn_.pacer) {
conn_.pacer->onPacketsLoss();
}
}
if (ackEvent && ackEvent->largestAckedPacket.hasValue()) {
CHECK(!ackEvent->ackedPackets.empty());

View File

@@ -127,6 +127,9 @@ void Copa::onPacketAckOrLoss(
folly::Optional<LossEvent> loss) {
if (loss) {
onPacketLoss(*loss);
if (conn_.pacer) {
conn_.pacer->onPacketsLoss();
}
QUIC_TRACE(copa_loss, conn_, cwndBytes_, bytesInFlight_);
}
if (ack && ack->largestAckedPacket.hasValue()) {

View File

@@ -89,6 +89,8 @@ void NewReno::onPacketAckOrLoss(
folly::Optional<LossEvent> lossEvent) {
if (lossEvent) {
onPacketLoss(*lossEvent);
// When we start to support pacing in NewReno, we need to call onPacketsLoss
// on the pacer when there is loss.
}
if (ackEvent && ackEvent->largestAckedPacket.hasValue()) {
onAckEvent(*ackEvent);

View File

@@ -20,8 +20,8 @@ DefaultPacer::DefaultPacer(
minCwndInMss_(minCwndInMss),
batchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
pacingRateCalculator_(calculatePacingRate),
cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit) {
}
cachedBatchSize_(conn.transportSettings.writeConnectionDataPacketsLimit),
tokens_(conn.transportSettings.writeConnectionDataPacketsLimit) {}
// TODO: we choose to keep refershing pacing rate even when we are app-limited,
// so that when we exit app-limited, we have an updated pacing rate. But I don't
@@ -46,14 +46,25 @@ void DefaultPacer::refreshPacingRate(
pacingRateCalculator_(conn_, cwndBytes, minCwndInMss_, rtt);
writeInterval_ = pacingRate.interval;
batchSize_ = pacingRate.burstSize;
tokens_ += batchSize_;
}
void DefaultPacer::onPacedWriteScheduled(TimePoint currentTime) {
scheduledWriteTime_ = currentTime;
}
void DefaultPacer::onPacketSent() {
if (tokens_) {
--tokens_;
}
}
void DefaultPacer::onPacketsLoss() {
tokens_ = 0UL;
}
std::chrono::microseconds DefaultPacer::getTimeUntilNextWrite() const {
return appLimited_ ? 0us : writeInterval_;
return (appLimited_ || tokens_) ? 0us : writeInterval_;
}
uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) {
@@ -68,13 +79,19 @@ uint64_t DefaultPacer::updateAndGetWriteBatchSize(TimePoint currentTime) {
return batchSize_;
}
if (!scheduledWriteTime_ || *scheduledWriteTime_ >= currentTime) {
return batchSize_;
return tokens_;
}
auto adjustedInterval = std::chrono::duration_cast<std::chrono::microseconds>(
currentTime - *scheduledWriteTime_ + writeInterval_);
cachedBatchSize_ = std::ceil(
adjustedInterval.count() * batchSize_ * 1.0 / writeInterval_.count());
return cachedBatchSize_;
if (cachedBatchSize_ < batchSize_) {
LOG(ERROR)
<< "Quic pacer batch size calculation: cachedBatchSize < batchSize";
}
tokens_ +=
(cachedBatchSize_ > batchSize_ ? cachedBatchSize_ - batchSize_ : 0);
return tokens_;
}
uint64_t DefaultPacer::getCachedWriteBatchSize() const {

View File

@@ -43,6 +43,9 @@ class DefaultPacer : public Pacer {
void setAppLimited(bool limited) override;
void onPacketSent() override;
void onPacketsLoss() override;
private:
const QuicConnectionStateBase& conn_;
uint64_t minCwndInMss_;
@@ -52,5 +55,6 @@ class DefaultPacer : public Pacer {
PacingRateCalculator pacingRateCalculator_;
uint64_t cachedBatchSize_;
bool appLimited_{false};
uint64_t tokens_;
};
} // namespace quic

View File

@@ -384,6 +384,9 @@ void Cubic::onPacketAckOrLoss(
// furture.
if (lossEvent) {
onPacketLoss(*lossEvent);
if (conn_.pacer) {
conn_.pacer->onPacketsLoss();
}
}
if (ackEvent && ackEvent->largestAckedPacket.hasValue()) {
CHECK(!ackEvent->ackedPackets.empty());

View File

@@ -667,5 +667,21 @@ TEST_F(BbrTest, UpdatePacerAppLimited) {
makeAck(0, 1000, Clock::now(), Clock::now() - 5ms), folly::none);
}
TEST_F(BbrTest, PacketLossInvokesPacer) {
QuicConnectionStateBase conn(QuicNodeType::Client);
BbrCongestionController::BbrConfig config;
BbrCongestionController bbr(conn, config);
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
auto packet = makeTestingWritePacket(0, 1000, 1000);
bbr.onPacketSent(packet);
EXPECT_CALL(*rawPacer, onPacketsLoss()).Times(1);
CongestionController::LossEvent lossEvent;
lossEvent.addLostPacket(packet);
bbr.onPacketAckOrLoss(folly::none, lossEvent);
}
} // namespace test
} // namespace quic

View File

@@ -10,6 +10,7 @@
#include <folly/portability/GTest.h>
#include <quic/common/test/TestUtils.h>
#include <quic/state/test/Mocks.h>
using namespace testing;
@@ -455,5 +456,19 @@ TEST_F(CopaTest, NoLargestAckedPacketNoCrash) {
EXPECT_EQ(event->congestionEvent, kCongestionPacketLoss);
}
TEST_F(CopaTest, PacketLossInvokesPacer) {
QuicServerConnectionState conn;
Copa copa(conn);
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
auto packet = createPacket(0 /* pacetNum */, 1000, 1000);
copa.onPacketSent(packet);
EXPECT_CALL(*rawPacer, onPacketsLoss()).Times(1);
CongestionController::LossEvent lossEvent;
lossEvent.addLostPacket(packet);
copa.onPacketAckOrLoss(folly::none, lossEvent);
}
} // namespace test
} // namespace quic

View File

@@ -303,5 +303,20 @@ TEST_F(CubicTest, PacingGain) {
auto event = dynamic_cast<QLogTransportStateUpdateEvent*>(tmp.get());
EXPECT_EQ(event->update, kRecalculateTimeToOrigin);
}
TEST_F(CubicTest, PacetLossInvokesPacer) {
QuicConnectionStateBase conn(QuicNodeType::Client);
auto mockPacer = std::make_unique<MockPacer>();
auto rawPacer = mockPacer.get();
conn.pacer = std::move(mockPacer);
Cubic cubic(conn);
auto packet = makeTestingWritePacket(0, 1000, 1000);
cubic.onPacketSent(packet);
EXPECT_CALL(*rawPacer, onPacketsLoss()).Times(1);
CongestionController::LossEvent lossEvent;
lossEvent.addLostPacket(packet);
cubic.onPacketAckOrLoss(folly::none, lossEvent);
}
} // namespace test
} // namespace quic

View File

@@ -14,6 +14,14 @@ using namespace testing;
namespace quic {
namespace test {
namespace {
void consumeTokensHelper(Pacer& pacer, size_t tokensToConsume) {
for (size_t i = 0; i < tokensToConsume; i++) {
pacer.onPacketSent();
}
}
} // namespace
class PacerTest : public Test {
public:
void SetUp() override {
@@ -40,8 +48,13 @@ TEST_F(PacerTest, RateCalculator) {
return PacingRate::Builder().setInterval(1234us).setBurstSize(4321).build();
});
pacer.refreshPacingRate(200000, 200us);
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(
4321 + conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(Clock::now()));
consumeTokensHelper(
pacer, 4321 + conn.transportSettings.writeConnectionDataPacketsLimit);
EXPECT_EQ(1234us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(4321, pacer.updateAndGetWriteBatchSize(Clock::now()));
}
TEST_F(PacerTest, CompensateTimerDrift) {
@@ -54,11 +67,22 @@ TEST_F(PacerTest, CompensateTimerDrift) {
auto currentTime = Clock::now();
pacer.refreshPacingRate(20, 100us); // These two values do not matter here
pacer.onPacedWriteScheduled(currentTime);
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(currentTime + 1000us));
EXPECT_EQ(
20 + conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(currentTime + 1000us));
// Query batch size again without calling onPacedWriteScheduled won't do timer
// drift compensation
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
// drift compensation. But token_ keeps the last compenstation.
EXPECT_EQ(
20 + conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
// Consume a few:
consumeTokensHelper(pacer, 3);
EXPECT_EQ(
20 + conn.transportSettings.writeConnectionDataPacketsLimit - 3,
pacer.updateAndGetWriteBatchSize(currentTime + 2000us));
}
TEST_F(PacerTest, NextWriteTime) {
@@ -71,6 +95,15 @@ TEST_F(PacerTest, NextWriteTime) {
return PacingRate::Builder().setInterval(rtt).setBurstSize(10).build();
});
pacer.refreshPacingRate(20, 1000us);
// Right after refresh, it's always 0us. You can always send right after an
// ack.
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
// Consume all the tokens:
consumeTokensHelper(
pacer, 10 + conn.transportSettings.writeConnectionDataPacketsLimit);
// Then we use real delay:
EXPECT_EQ(1000us, pacer.getTimeUntilNextWrite());
}
@@ -125,5 +158,48 @@ TEST_F(PacerTest, AppLimited) {
EXPECT_EQ(12, pacer.updateAndGetWriteBatchSize(Clock::now()));
}
TEST_F(PacerTest, Tokens) {
// Pacer has tokens right after init:
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(
conn.transportSettings.writeConnectionDataPacketsLimit,
pacer.updateAndGetWriteBatchSize(Clock::now()));
// Consume all initial tokens:
consumeTokensHelper(
pacer, conn.transportSettings.writeConnectionDataPacketsLimit);
// Pacing rate: 10 mss per 10 ms
pacer.setPacingRateCalculator([](const QuicConnectionStateBase&,
uint64_t,
uint64_t,
std::chrono::microseconds) {
return PacingRate::Builder().setInterval(10ms).setBurstSize(10).build();
});
// These input doesn't matter, the rate calculator above returns fixed values.
pacer.refreshPacingRate(100, 100ms);
EXPECT_EQ(0us, pacer.getTimeUntilNextWrite());
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(Clock::now()));
// Consume all tokens:
consumeTokensHelper(pacer, 10);
EXPECT_EQ(10ms, pacer.getTimeUntilNextWrite());
EXPECT_EQ(0, pacer.updateAndGetWriteBatchSize(Clock::now()));
// Do a schedule:
auto curTime = Clock::now();
pacer.onPacedWriteScheduled(curTime);
// 10ms later you should have 10 mss credit:
EXPECT_EQ(10, pacer.updateAndGetWriteBatchSize(curTime + 10ms));
// Schedule again from this point:
pacer.onPacedWriteScheduled(curTime + 10ms);
// Then elapse another 10ms, and previous tokens hasn't been used:
EXPECT_EQ(20, pacer.updateAndGetWriteBatchSize(curTime + 20ms));
}
} // namespace test
} // namespace quic

View File

@@ -165,6 +165,8 @@ struct Pacer {
virtual uint64_t getCachedWriteBatchSize() const = 0;
virtual void setAppLimited(bool limited) = 0;
virtual void onPacketSent() = 0;
virtual void onPacketsLoss() = 0;
};
struct PacingRate {

View File

@@ -40,6 +40,8 @@ class MockPacer : public Pacer {
MOCK_METHOD1(updateAndGetWriteBatchSize, uint64_t(TimePoint));
MOCK_CONST_METHOD0(getCachedWriteBatchSize, uint64_t());
MOCK_METHOD1(setAppLimited, void(bool));
MOCK_METHOD0(onPacketSent, void());
MOCK_METHOD0(onPacketsLoss, void());
};
} // namespace test
} // namespace quic