1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Add CWND and writable bytes to PacketsWrittenEvent

Summary: Extend `PacketsWrittenEvent` to include the CWND and number of writable bytes from the congestion controller after the write completed. If no congestion controller is installed, these values are left blank.

Differential Revision: D32588533

fbshipit-source-id: 4b8361937fcab36daa17e5f6dc4386762987d2b4
This commit is contained in:
Brandon Schlinker
2022-03-28 23:26:30 -07:00
committed by Facebook GitHub Bot
parent f30df88637
commit da71a15307
7 changed files with 1038 additions and 24 deletions

View File

@@ -19,6 +19,7 @@
#include <quic/common/BufUtil.h>
#include <quic/common/Timers.h>
#include <quic/common/test/TestUtils.h>
#include <quic/congestion_control/StaticCwndCongestionController.h>
#include <quic/dsr/Types.h>
#include <quic/dsr/test/Mocks.h>
#include <quic/handshake/test/Mocks.h>
@@ -1060,6 +1061,266 @@ TEST_F(QuicTransportTest, ObserverPacketsWrittenCheckBytesSent) {
transport_ = nullptr;
}
TEST_F(QuicTransportTest, ObserverWriteEventsCheckCwndPacketsWritable) {
InSequence s;
Observer::Config config = {};
config.packetsWrittenEvents = true;
config.appRateLimitedEvents = true;
auto cb1 = std::make_unique<StrictMock<MockObserver>>(config);
auto cb2 = std::make_unique<StrictMock<MockObserver>>(config);
auto cb3 = std::make_unique<StrictMock<MockObserver>>(Observer::Config());
const auto invokeForAllObservers =
[&cb1, &cb2, &cb3](const std::function<void(MockObserver&)>& fn) {
fn(*cb1);
fn(*cb2);
fn(*cb3);
};
const auto invokeForEachObserverWithTestEvents =
[&cb1, &cb2](const std::function<void(MockObserver&)>& fn) {
fn(*cb1);
fn(*cb2);
};
// install observers
invokeForAllObservers(([this](MockObserver& observer) {
EXPECT_CALL(observer, observerAttach(transport_.get()));
transport_->addObserver(&observer);
}));
EXPECT_THAT(
transport_->getObservers(),
UnorderedElementsAre(cb1.get(), cb2.get(), cb3.get()));
auto& conn = transport_->getConnectionState();
// install StaticCwndCongestionController
const auto cwndInBytes = 10000;
conn.congestionController = std::make_unique<StaticCwndCongestionController>(
StaticCwndCongestionController::CwndInBytes(cwndInBytes));
// update writeNum and upperBoundCurrentBytesWritable after each write/ACK
uint64_t writeNum = 1;
uint64_t upperBoundCurrentBytesWritable = cwndInBytes;
// write of 4000 stream bytes
{
const auto bytesToWrite = 4000;
// matcher for event from startWritingFromAppLimited
const auto startWritingFromAppLimitedMatcher = AllOf(
testing::Property(
&Observer::WriteEvent::getOutstandingPackets, testing::IsEmpty()),
testing::Field(
&Observer::WriteEvent::writeCount, testing::Eq(writeNum)),
testing::Field(
&Observer::WriteEvent::maybeCwndInBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))),
testing::Field(
&Observer::WriteEvent::maybeWritableBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))));
// matcher for event from packetsWritten
const auto packetsWrittenMatcher = AllOf(
testing::Property(
&Observer::WriteEvent::getOutstandingPackets, testing::SizeIs(4)),
testing::Field(
&Observer::WriteEvent::writeCount, testing::Eq(writeNum)),
testing::Field(
&Observer::WriteEvent::maybeCwndInBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))),
testing::Field( // precise check below
&Observer::WriteEvent::maybeWritableBytes,
testing::Lt(folly::Optional<uint64_t>(
upperBoundCurrentBytesWritable - bytesToWrite))),
testing::Field(
&Observer::PacketsWrittenEvent::numPacketsWritten, testing::Eq(4)),
testing::Field(
&Observer::PacketsWrittenEvent::numAckElicitingPacketsWritten,
testing::Eq(4)),
testing::Field(
&Observer::PacketsWrittenEvent::numBytesWritten,
testing::Gt(bytesToWrite)));
// matcher for event from appRateLimited
const auto appRateLimitedMatcher = AllOf(
testing::Property(
&Observer::WriteEvent::getOutstandingPackets, testing::SizeIs(4)),
testing::Field(
&Observer::WriteEvent::writeCount, testing::Eq(writeNum)),
testing::Field(
&Observer::WriteEvent::maybeCwndInBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))),
testing::Field( // precise check below
&Observer::WriteEvent::maybeWritableBytes,
testing::Lt(folly::Optional<uint64_t>(
upperBoundCurrentBytesWritable - bytesToWrite))));
invokeForEachObserverWithTestEvents(
([this, &startWritingFromAppLimitedMatcher](MockObserver& observer) {
EXPECT_CALL(
observer,
startWritingFromAppLimited(
transport_.get(), startWritingFromAppLimitedMatcher));
}));
invokeForEachObserverWithTestEvents(
([this,
&packetsWrittenMatcher,
cwndInBytes,
oldTInfo = transport_->getTransportInfo()](MockObserver& observer) {
EXPECT_CALL(
observer, packetsWritten(transport_.get(), packetsWrittenMatcher))
.WillOnce(([cwndInBytes, oldTInfo](
const auto& socket, const auto& event) {
EXPECT_EQ(
cwndInBytes - socket->getTransportInfo().bytesSent -
oldTInfo.bytesSent,
event.maybeWritableBytes);
}));
}));
invokeForEachObserverWithTestEvents(
([this,
&appRateLimitedMatcher,
cwndInBytes,
oldTInfo = transport_->getTransportInfo()](MockObserver& observer) {
EXPECT_CALL(
observer, appRateLimited(transport_.get(), appRateLimitedMatcher))
.WillOnce(([cwndInBytes, oldTInfo](
const auto& socket, const auto& event) {
EXPECT_EQ(
cwndInBytes - socket->getTransportInfo().bytesSent -
oldTInfo.bytesSent,
event.maybeWritableBytes);
}));
}));
auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain(
stream, buildRandomInputData(bytesToWrite), false, nullptr);
transport_->updateWriteLooper(true);
loopForWrites();
loopForWrites();
// remove bytesToWrite from upperBoundCurrentBytesWritable
upperBoundCurrentBytesWritable -= bytesToWrite;
writeNum++;
}
// another write of 1000 stream bytes
{
const auto bytesToWrite = 1000;
// matcher for event from startWritingFromAppLimited
const auto startWritingFromAppLimitedMatcher = AllOf(
testing::Property(
&Observer::WriteEvent::getOutstandingPackets, testing::SizeIs(4)),
testing::Field(
&Observer::WriteEvent::writeCount, testing::Eq(writeNum)),
testing::Field(
&Observer::WriteEvent::maybeCwndInBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))),
testing::Field(
&Observer::WriteEvent::maybeWritableBytes,
testing::Lt(
folly::Optional<uint64_t>(upperBoundCurrentBytesWritable))));
// matcher for event from packetsWritten
const auto packetsWrittenMatcher = AllOf(
testing::Property(
&Observer::WriteEvent::getOutstandingPackets, testing::SizeIs(5)),
testing::Field(
&Observer::WriteEvent::writeCount, testing::Eq(writeNum)),
testing::Field(
&Observer::WriteEvent::maybeCwndInBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))),
testing::Field( // precise check below
&Observer::WriteEvent::maybeWritableBytes,
testing::Lt(folly::Optional<uint64_t>(
upperBoundCurrentBytesWritable - bytesToWrite))),
testing::Field(
&Observer::PacketsWrittenEvent::numPacketsWritten, testing::Eq(1)),
testing::Field(
&Observer::PacketsWrittenEvent::numAckElicitingPacketsWritten,
testing::Eq(1)),
testing::Field(
&Observer::PacketsWrittenEvent::numBytesWritten,
testing::Gt(bytesToWrite)));
// matcher for event from appRateLimited
const auto appRateLimitedMatcher = AllOf(
testing::Property(
&Observer::WriteEvent::getOutstandingPackets, testing::SizeIs(5)),
testing::Field(
&Observer::WriteEvent::writeCount, testing::Eq(writeNum)),
testing::Field(
&Observer::WriteEvent::maybeCwndInBytes,
testing::Eq(folly::Optional<uint64_t>(cwndInBytes))),
testing::Field( // precise check below
&Observer::WriteEvent::maybeWritableBytes,
testing::Lt(folly::Optional<uint64_t>(
upperBoundCurrentBytesWritable - bytesToWrite))));
invokeForEachObserverWithTestEvents(
([this, &startWritingFromAppLimitedMatcher](MockObserver& observer) {
EXPECT_CALL(
observer,
startWritingFromAppLimited(
transport_.get(), startWritingFromAppLimitedMatcher));
}));
invokeForEachObserverWithTestEvents(
([this,
&packetsWrittenMatcher,
oldTInfo = transport_->getTransportInfo()](MockObserver& observer) {
EXPECT_CALL(
observer, packetsWritten(transport_.get(), packetsWrittenMatcher))
.WillOnce(([oldTInfo](const auto& socket, const auto& event) {
EXPECT_EQ(
oldTInfo.writableBytes -
(socket->getTransportInfo().bytesSent -
oldTInfo.bytesSent),
event.maybeWritableBytes);
}));
}));
invokeForEachObserverWithTestEvents(
([this,
&appRateLimitedMatcher,
oldTInfo = transport_->getTransportInfo()](MockObserver& observer) {
EXPECT_CALL(
observer, appRateLimited(transport_.get(), appRateLimitedMatcher))
.WillOnce(([oldTInfo](const auto& socket, const auto& event) {
EXPECT_EQ(
oldTInfo.writableBytes -
(socket->getTransportInfo().bytesSent -
oldTInfo.bytesSent),
event.maybeWritableBytes);
}));
}));
auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain(
stream, buildRandomInputData(bytesToWrite), false, nullptr);
transport_->updateWriteLooper(true);
loopForWrites();
loopForWrites();
// remove bytesToWrite from upperBoundCurrentBytesWritable
upperBoundCurrentBytesWritable -= bytesToWrite;
writeNum++;
}
invokeForAllObservers(([this](MockObserver& observer) {
EXPECT_CALL(observer, close(transport_.get(), _));
}));
invokeForAllObservers(([this](MockObserver& observer) {
EXPECT_CALL(observer, destroy(transport_.get()));
}));
transport_->close(folly::none);
transport_ = nullptr;
}
TEST_F(QuicTransportTest, ObserverStreamEventBidirectionalLocalOpenClose) {
Observer::Config configWithStreamEvents = {};
configWithStreamEvents.streamEvents = true;