1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Adding packet loss event detection to instrumentationobserver

Summary:
`InstrumentationObserver` is owned by `QuicSocket` now. However, to capture packet loss signal, we need it in `QuicConnectionState`. So, I refactored the code a little bit to make this easy. Changes in this code:
(a)  Moved the definition of `InstrumentationObserver` and `LifeCycleObserver` to
 a separate header `Observer.h`.
(b) Moved the vector of `InstrumentationObserver`s from `QuicSocket` to `QuicConnectionState`.
(c) Added a callback in `conn->pendingCallbacks` when a packet loss is detected

Reviewed By: bschlinker

Differential Revision: D23018569

fbshipit-source-id: e70d954839bdb70679ecd52d2bd1a6a6841f6778
This commit is contained in:
Kanthi Nagaraj
2020-09-11 17:53:30 -07:00
committed by Facebook GitHub Bot
parent 6ef992b5ea
commit 327af996e2
9 changed files with 517 additions and 167 deletions

141
quic/api/Observer.h Normal file
View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#pragma once
#include <quic/QuicException.h>
#include <quic/common/SmallVec.h>
#include <quic/state/OutstandingPacket.h>
namespace quic {
class QuicSocket;
/**
* ===== Instrumentation Observer API =====
*/
/**
* Observer of socket instrumentation events.
*/
class InstrumentationObserver {
public:
struct LostPacket {
explicit LostPacket(
bool lostbytimeout,
bool lostbyreorder,
const quic::OutstandingPacket& pkt)
: lostByTimeout(lostbytimeout),
lostByReorderThreshold(lostbyreorder),
packet(pkt) {}
bool lostByTimeout{false};
bool lostByReorderThreshold{false};
const quic::OutstandingPacket packet;
};
struct ObserverLossEvent {
explicit ObserverLossEvent(TimePoint time = Clock::now())
: lossTime(time) {}
bool hasPackets() {
return lostPackets.size() > 0;
}
void addLostPacket(
bool lostByTimeout,
bool lostByReorder,
const quic::OutstandingPacket& packet) {
lostPackets.emplace_back(lostByTimeout, lostByReorder, packet);
}
const TimePoint lossTime;
std::vector<LostPacket> lostPackets;
};
virtual ~InstrumentationObserver() = default;
/**
* observerDetach() will be invoked when the observer is uninstalled.
*
* No further callbacks will be invoked after observerDetach().
*
* @param socket Socket where observer was uninstalled.
*/
virtual void observerDetach(QuicSocket* /* socket */) noexcept = 0;
/**
* appRateLimited() is invoked when the socket is app rate limited.
*
* @param socket Socket that has become application rate limited.
*/
virtual void appRateLimited(QuicSocket* /* socket */) {}
/**
* packetLossDetected() is invoked when a packet loss is detected.
*
* @param packet const reference to the packet that was determined to be
* lost.
*/
virtual void packetLossDetected(
const struct ObserverLossEvent& /* lossEvent */) {}
};
// Container for instrumentation observers.
// Avoids heap allocation for up to 2 observers being installed.
using InstrumentationObserverVec = SmallVec<InstrumentationObserver*, 2>;
/**
* ===== Lifecycle Observer API =====
*/
/**
* Observer of socket lifecycle events.
*/
class LifecycleObserver {
public:
virtual ~LifecycleObserver() = default;
/**
* observerAttach() will be invoked when an observer is added.
*
* @param socket Socket where observer was installed.
*/
virtual void observerAttach(QuicSocket* /* socket */) noexcept = 0;
/**
* observerDetach() will be invoked if the observer is uninstalled prior
* to socket destruction.
*
* No further callbacks will be invoked after observerDetach().
*
* @param socket Socket where observer was uninstalled.
*/
virtual void observerDetach(QuicSocket* /* socket */) noexcept = 0;
/**
* destroy() will be invoked when the QuicSocket's destructor is invoked.
*
* No further callbacks will be invoked after destroy().
*
* @param socket Socket being destroyed.
*/
virtual void destroy(QuicSocket* /* socket */) noexcept = 0;
/**
* close() will be invoked when the socket is being closed.
*
* If the callback handler does not unsubscribe itself upon being called,
* then it may be called multiple times (e.g., by a call to close() by
* the application, and then again when closeNow() is called on
* destruction).
*
* @param socket Socket being closed.
* @param errorOpt Error information, if connection closed due to error.
*/
virtual void close(
QuicSocket* /* socket */,
const folly::Optional<
std::pair<QuicErrorCode, std::string>>& /* errorOpt */) noexcept = 0;
};
} // namespace quic

View File

@@ -13,6 +13,7 @@
#include <folly/io/IOBuf.h> #include <folly/io/IOBuf.h>
#include <folly/io/async/AsyncTransportCertificate.h> #include <folly/io/async/AsyncTransportCertificate.h>
#include <quic/QuicConstants.h> #include <quic/QuicConstants.h>
#include <quic/api/Observer.h>
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/common/SmallVec.h> #include <quic/common/SmallVec.h>
#include <quic/state/StateData.h> #include <quic/state/StateData.h>
@@ -1093,61 +1094,6 @@ class QuicSocket {
*/ */
virtual void setCongestionControl(CongestionControlType type) = 0; virtual void setCongestionControl(CongestionControlType type) = 0;
/**
* ===== Lifecycle Observer API =====
*/
/**
* Observer of socket lifecycle events.
*/
class LifecycleObserver {
public:
virtual ~LifecycleObserver() = default;
/**
* observerAttach() will be invoked when an observer is added.
*
* @param socket Socket where observer was installed.
*/
virtual void observerAttach(QuicSocket* /* socket */) noexcept = 0;
/**
* observerDetach() will be invoked if the observer is uninstalled prior
* to socket destruction.
*
* No further callbacks will be invoked after observerDetach().
*
* @param socket Socket where observer was uninstalled.
*/
virtual void observerDetach(QuicSocket* /* socket */) noexcept = 0;
/**
* destroy() will be invoked when the QuicSocket's destructor is invoked.
*
* No further callbacks will be invoked after destroy().
*
* @param socket Socket being destroyed.
*/
virtual void destroy(QuicSocket* /* socket */) noexcept = 0;
/**
* close() will be invoked when the socket is being closed.
*
* If the callback handler does not unsubscribe itself upon being called,
* then it may be called multiple times (e.g., by a call to close() by
* the application, and then again when closeNow() is called on
* destruction).
*
* @param socket Socket being closed.
* @param errorOpt Error information, if connection closed due to error.
*/
virtual void close(
QuicSocket* /* socket */,
const folly::Optional<std::pair<
QuicErrorCode,
std::string>>& /* errorOpt */) noexcept = 0;
};
// Container for lifecycle observers. // Container for lifecycle observers.
// Avoids heap allocation for up to 2 observers being installed. // Avoids heap allocation for up to 2 observers being installed.
using LifecycleObserverVec = SmallVec<LifecycleObserver*, 2>; using LifecycleObserverVec = SmallVec<LifecycleObserver*, 2>;
@@ -1181,38 +1127,6 @@ class QuicSocket {
FOLLY_NODISCARD virtual const LifecycleObserverVec& getLifecycleObservers() FOLLY_NODISCARD virtual const LifecycleObserverVec& getLifecycleObservers()
const = 0; const = 0;
/**
* ===== Instrumentation Observer API =====
*/
/**
* Observer of socket instrumentation events.
*/
class InstrumentationObserver {
public:
virtual ~InstrumentationObserver() = default;
/**
* observerDetach() will be invoked when the observer is uninstalled.
*
* No further callbacks will be invoked after observerDetach().
*
* @param socket Socket where observer was uninstalled.
*/
virtual void observerDetach(QuicSocket* /* socket */) noexcept = 0;
/**
* appRateLimited() is invoked when the socket is app rate limited.
*
* @param socket Socket that has become application rate limited.
*/
virtual void appRateLimited(QuicSocket* /* socket */) {}
};
// Container for instrumentation observers.
// Avoids heap allocation for up to 2 observers being installed.
using InstrumentationObserverVec = SmallVec<InstrumentationObserver*, 2>;
/** /**
* Adds a instrumentation observer. * Adds a instrumentation observer.
* *

View File

@@ -249,10 +249,10 @@ void QuicTransportBase::closeImpl(
for (const auto& cb : lifecycleObservers_) { for (const auto& cb : lifecycleObservers_) {
cb->close(this, errorCode); cb->close(this, errorCode);
} }
for (const auto& cb : instrumentationObservers_) { for (const auto& cb : conn_->instrumentationObservers_) {
cb->observerDetach(this); cb->observerDetach(this);
} }
instrumentationObservers_.clear(); conn_->instrumentationObservers_.clear();
if (closeState_ == CloseState::CLOSED) { if (closeState_ == CloseState::CLOSED) {
return; return;
@@ -1645,6 +1645,12 @@ void QuicTransportBase::processCallbacksAfterNetworkData() {
} }
} }
// to call any callbacks added for observers
for (const auto& callback : conn_->pendingCallbacks) {
callback();
}
conn_->pendingCallbacks.clear();
handlePingCallback(); handlePingCallback();
if (closeState_ != CloseState::OPEN) { if (closeState_ != CloseState::OPEN) {
return; return;
@@ -2596,29 +2602,30 @@ QuicTransportBase::getLifecycleObservers() const {
void QuicTransportBase::addInstrumentationObserver( void QuicTransportBase::addInstrumentationObserver(
InstrumentationObserver* observer) { InstrumentationObserver* observer) {
instrumentationObservers_.push_back(CHECK_NOTNULL(observer)); conn_->instrumentationObservers_.push_back(CHECK_NOTNULL(observer));
} }
bool QuicTransportBase::removeInstrumentationObserver( bool QuicTransportBase::removeInstrumentationObserver(
InstrumentationObserver* observer) { InstrumentationObserver* observer) {
const auto eraseIt = std::remove( const auto eraseIt = std::remove(
instrumentationObservers_.begin(), conn_->instrumentationObservers_.begin(),
instrumentationObservers_.end(), conn_->instrumentationObservers_.end(),
observer); observer);
if (eraseIt == instrumentationObservers_.end()) { if (eraseIt == conn_->instrumentationObservers_.end()) {
return false; return false;
} }
for (auto it = eraseIt; it != instrumentationObservers_.end(); it++) { for (auto it = eraseIt; it != conn_->instrumentationObservers_.end(); it++) {
(*it)->observerDetach(this); (*it)->observerDetach(this);
} }
instrumentationObservers_.erase(eraseIt, instrumentationObservers_.end()); conn_->instrumentationObservers_.erase(
eraseIt, conn_->instrumentationObservers_.end());
return true; return true;
} }
const QuicTransportBase::InstrumentationObserverVec& const InstrumentationObserverVec&
QuicTransportBase::getInstrumentationObservers() const { QuicTransportBase::getInstrumentationObservers() const {
return instrumentationObservers_; return conn_->instrumentationObservers_;
} }
void QuicTransportBase::writeSocketData() { void QuicTransportBase::writeSocketData() {
@@ -2669,7 +2676,7 @@ void QuicTransportBase::writeSocketData() {
conn_->congestionController->setAppLimited(); conn_->congestionController->setAppLimited();
// notify via connection call and any instrumentation callbacks // notify via connection call and any instrumentation callbacks
connCallback_->onAppRateLimited(); connCallback_->onAppRateLimited();
for (const auto& cb : instrumentationObservers_) { for (const auto& cb : conn_->instrumentationObservers_) {
cb->appRateLimited(this); cb->appRateLimited(this);
} }
} }

View File

@@ -307,7 +307,7 @@ class MockLoopDetectorCallback : public LoopDetectorCallback {
MOCK_METHOD2(onSuspiciousReadLoops, void(uint64_t, NoReadReason)); MOCK_METHOD2(onSuspiciousReadLoops, void(uint64_t, NoReadReason));
}; };
class MockLifecycleObserver : public QuicSocket::LifecycleObserver { class MockLifecycleObserver : public LifecycleObserver {
public: public:
GMOCK_METHOD1_(, noexcept, , observerAttach, void(QuicSocket*)); GMOCK_METHOD1_(, noexcept, , observerAttach, void(QuicSocket*));
GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*)); GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*));
@@ -322,10 +322,26 @@ class MockLifecycleObserver : public QuicSocket::LifecycleObserver {
const folly::Optional<std::pair<QuicErrorCode, std::string>>&)); const folly::Optional<std::pair<QuicErrorCode, std::string>>&));
}; };
class MockInstrumentationObserver : public QuicSocket::InstrumentationObserver { class MockInstrumentationObserver : public InstrumentationObserver {
public: public:
GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*)); GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*));
GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*)); GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*));
GMOCK_METHOD1_(
,
noexcept,
,
packetLossDetected,
void(const ObserverLossEvent&));
static auto getLossPacketMatcher(bool reorderLoss, bool timeoutLoss) {
return AllOf(
testing::Field(
&InstrumentationObserver::LostPacket::lostByReorderThreshold,
testing::Eq(reorderLoss)),
testing::Field(
&InstrumentationObserver::LostPacket::lostByTimeout,
testing::Eq(timeoutLoss)));
}
}; };
inline std::ostream& operator<<(std::ostream& os, const MockQuicTransport&) { inline std::ostream& operator<<(std::ostream& os, const MockQuicTransport&) {

View File

@@ -423,6 +423,10 @@ class TestQuicTransport
writeSocketData(); writeSocketData();
} }
void invokeProcessCallbacksAfterNetworkData() {
processCallbacksAfterNetworkData();
}
QuicServerConnectionState* transportConn; QuicServerConnectionState* transportConn;
std::unique_ptr<Aead> aead; std::unique_ptr<Aead> aead;
std::unique_ptr<PacketNumberCipher> headerCipher; std::unique_ptr<PacketNumberCipher> headerCipher;
@@ -3652,5 +3656,27 @@ TEST_F(
Mock::VerifyAndClearExpectations(cb2.get()); Mock::VerifyAndClearExpectations(cb2.get());
} }
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksDeleted) {
auto noopCallback = [] {};
transport->transportConn->pendingCallbacks.emplace_back(noopCallback);
EXPECT_EQ(1, size(transport->transportConn->pendingCallbacks));
transport->invokeProcessCallbacksAfterNetworkData();
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
}
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksInvoked) {
uint32_t callbacksInvoked = 0;
auto countingCallback = [&]() { callbacksInvoked++; };
for (int i = 0; i < 2; i++) {
transport->transportConn->pendingCallbacks.emplace_back(countingCallback);
}
EXPECT_EQ(2, size(transport->transportConn->pendingCallbacks));
transport->invokeProcessCallbacksAfterNetworkData();
EXPECT_EQ(2, callbacksInvoked);
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -9,6 +9,7 @@
#pragma once #pragma once
#include <quic/QuicConstants.h> #include <quic/QuicConstants.h>
#include <quic/api/Observer.h>
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/common/TimeUtil.h> #include <quic/common/TimeUtil.h>
#include <quic/flowcontrol/QuicFlowController.h> #include <quic/flowcontrol/QuicFlowController.h>
@@ -204,6 +205,7 @@ folly::Optional<CongestionController::LossEvent> detectLossPackets(
<< " delayUntilLost=" << delayUntilLost.count() << "us" << " delayUntilLost=" << delayUntilLost.count() << "us"
<< " " << conn; << " " << conn;
CongestionController::LossEvent lossEvent(lossTime); CongestionController::LossEvent lossEvent(lossTime);
InstrumentationObserver::ObserverLossEvent observerLossEvent(lossTime);
// Note that time based loss detection is also within the same PNSpace. // Note that time based loss detection is also within the same PNSpace.
auto iter = getFirstOutstandingPacket(conn, pnSpace); auto iter = getFirstOutstandingPacket(conn, pnSpace);
bool shouldSetTimer = false; bool shouldSetTimer = false;
@@ -218,16 +220,19 @@ folly::Optional<CongestionController::LossEvent> detectLossPackets(
iter++; iter++;
continue; continue;
} }
bool lost = (lossTime - pkt.time) > delayUntilLost; bool lostByTimeout = (lossTime - pkt.time) > delayUntilLost;
lost = lost || bool lostByReorder =
(*largestAcked - currentPacketNum) > conn.lossState.reorderingThreshold; (*largestAcked - currentPacketNum) > conn.lossState.reorderingThreshold;
if (!lost) {
if (!(lostByTimeout || lostByReorder)) {
// We can exit early here because if packet N doesn't meet the // We can exit early here because if packet N doesn't meet the
// threshold, then packet N + 1 will not either. // threshold, then packet N + 1 will not either.
shouldSetTimer = true; shouldSetTimer = true;
break; break;
} }
lossEvent.addLostPacket(pkt); lossEvent.addLostPacket(pkt);
observerLossEvent.addLostPacket(lostByTimeout, lostByReorder, pkt);
if (pkt.associatedEvent) { if (pkt.associatedEvent) {
DCHECK_GT(conn.outstandings.clonedPacketsCount, 0); DCHECK_GT(conn.outstandings.clonedPacketsCount, 0);
--conn.outstandings.clonedPacketsCount; --conn.outstandings.clonedPacketsCount;
@@ -260,6 +265,15 @@ folly::Optional<CongestionController::LossEvent> detectLossPackets(
iter++; iter++;
} }
// if there are observers, enqueue a function to call it
if (observerLossEvent.hasPackets()) {
for (const auto& observer : conn.instrumentationObservers_) {
conn.pendingCallbacks.emplace_back([observer, observerLossEvent] {
observer->packetLossDetected(observerLossEvent);
});
}
}
auto earliest = getFirstOutstandingPacket(conn, pnSpace); auto earliest = getFirstOutstandingPacket(conn, pnSpace);
for (; earliest != conn.outstandings.packets.end(); for (; earliest != conn.outstandings.packets.end();
earliest = getNextOutstandingPacket(conn, pnSpace, earliest + 1)) { earliest = getNextOutstandingPacket(conn, pnSpace, earliest + 1)) {

View File

@@ -12,6 +12,7 @@
#include <folly/io/async/test/MockAsyncUDPSocket.h> #include <folly/io/async/test/MockAsyncUDPSocket.h>
#include <folly/io/async/test/MockTimeoutManager.h> #include <folly/io/async/test/MockTimeoutManager.h>
#include <quic/api/QuicTransportFunctions.h> #include <quic/api/QuicTransportFunctions.h>
#include <quic/api/test/Mocks.h>
#include <quic/client/state/ClientStateMachine.h> #include <quic/client/state/ClientStateMachine.h>
#include <quic/codec/DefaultConnectionIdAlgo.h> #include <quic/codec/DefaultConnectionIdAlgo.h>
#include <quic/common/test/TestUtils.h> #include <quic/common/test/TestUtils.h>
@@ -121,6 +122,11 @@ class QuicLossFunctionsTest : public TestWithParam<PacketNumberSpace> {
MockLossTimeout timeout; MockLossTimeout timeout;
std::unique_ptr<MockQuicStats> transportInfoCb_; std::unique_ptr<MockQuicStats> transportInfoCb_;
std::unique_ptr<ConnectionIdAlgo> connIdAlgo_; std::unique_ptr<ConnectionIdAlgo> connIdAlgo_;
auto getLossPacketMatcher(bool lossByReorder, bool lossByTimeout) {
return MockInstrumentationObserver::getLossPacketMatcher(
lossByReorder, lossByTimeout);
}
}; };
auto testingLossMarkFunc(std::vector<PacketNum>& lostPackets) { auto testingLossMarkFunc(std::vector<PacketNum>& lostPackets) {
@@ -1592,6 +1598,210 @@ TEST_F(QuicLossFunctionsTest, PersistentCongestion) {
EXPECT_FALSE(isPersistentCongestion(*conn, currentTime - 100us, currentTime)); EXPECT_FALSE(isPersistentCongestion(*conn, currentTime - 100us, currentTime));
} }
TEST_F(QuicLossFunctionsTest, TestReorderLossObserverCallback) {
auto ib = MockInstrumentationObserver();
auto conn = createConn();
// Register 1 instrumentation observer
conn->instrumentationObservers_.emplace_back(&ib);
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
for (int i = 0; i < 7; ++i) {
largestSent =
sendPacket(*conn, TimePoint(i * 10ms), folly::none, PacketType::OneRtt);
}
// Some packets are already acked
conn->outstandings.packets.erase(
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData) + 2,
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData) + 5);
// setting a very low reordering threshold to force loss by reorder
conn->lossState.reorderingThreshold = 1;
// setting time out parameters higher than the time at which detectLossPackets
// is called to make sure there are no losses by timeout
conn->lossState.srtt = 400ms;
conn->lossState.lrtt = 350ms;
conn->transportSettings.timeReorderingThreshDividend = 1.0;
conn->transportSettings.timeReorderingThreshDivisor = 1.0;
TimePoint checkTime = TimePoint(200ms);
detectLossPackets(
*conn,
largestSent + 1,
noopLossVisitor,
checkTime,
PacketNumberSpace::AppData);
// expecting 1 callback to be stacked
EXPECT_EQ(1, size(conn->pendingCallbacks));
// Out of 1, 2, 3, 4, 5, 6, 7 -- we deleted (acked) 3,4,5.
// 1, 2 and 6 are "lost" due to reodering. None lost due to timeout
EXPECT_CALL(
ib,
packetLossDetected(Field(
&InstrumentationObserver::ObserverLossEvent::lostPackets,
UnorderedElementsAre(
getLossPacketMatcher(true, false),
getLossPacketMatcher(true, false),
getLossPacketMatcher(true, false)))))
.Times(1);
for (auto& callback : conn->pendingCallbacks) {
callback();
}
}
TEST_F(QuicLossFunctionsTest, TestTimeoutLossObserverCallback) {
auto ib = MockInstrumentationObserver();
auto conn = createConn();
// Register 1 instrumentation observer
conn->instrumentationObservers_.emplace_back(&ib);
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
// send 7 packets
for (int i = 0; i < 7; ++i) {
largestSent =
sendPacket(*conn, TimePoint(i * 10ms), folly::none, PacketType::OneRtt);
}
// setting a very high reordering threshold to force loss by timeout only
conn->lossState.reorderingThreshold = 100;
// setting time out parameters lower than the time at which detectLossPackets
// is called to make sure all packets timeout
conn->lossState.srtt = 400ms;
conn->lossState.lrtt = 350ms;
conn->transportSettings.timeReorderingThreshDividend = 1.0;
conn->transportSettings.timeReorderingThreshDivisor = 1.0;
TimePoint checkTime = TimePoint(500ms);
detectLossPackets(
*conn,
largestSent + 1,
noopLossVisitor,
checkTime,
PacketNumberSpace::AppData);
// expecting 1 callback to be stacked
EXPECT_EQ(1, size(conn->pendingCallbacks));
// expecting all packets to be lost due to timeout
EXPECT_CALL(
ib,
packetLossDetected(Field(
&InstrumentationObserver::ObserverLossEvent::lostPackets,
UnorderedElementsAre(
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true),
getLossPacketMatcher(false, true)))))
.Times(1);
for (auto& callback : conn->pendingCallbacks) {
callback();
}
}
TEST_F(QuicLossFunctionsTest, TestTimeoutAndReorderLossObserverCallback) {
auto ib = MockInstrumentationObserver();
auto conn = createConn();
// Register 1 instrumentation observer
conn->instrumentationObservers_.emplace_back(&ib);
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
for (int i = 0; i < 7; ++i) {
largestSent =
sendPacket(*conn, TimePoint(i * 10ms), folly::none, PacketType::OneRtt);
}
// Some packets are already acked
conn->outstandings.packets.erase(
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData) + 2,
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData) + 5);
// setting a low reorder threshold
conn->lossState.reorderingThreshold = 1;
// setting time out parameters lower than the time at which detectLossPackets
// is called to make sure all packets timeout
conn->lossState.srtt = 400ms;
conn->lossState.lrtt = 350ms;
conn->transportSettings.timeReorderingThreshDividend = 1.0;
conn->transportSettings.timeReorderingThreshDivisor = 1.0;
TimePoint checkTime = TimePoint(500ms);
detectLossPackets(
*conn,
largestSent + 1,
noopLossVisitor,
checkTime,
PacketNumberSpace::AppData);
// expecting 1 callback to be stacked
EXPECT_EQ(1, size(conn->pendingCallbacks));
// Out of 1, 2, 3, 4, 5, 6, 7 -- we deleted (acked) 3,4,5.
// 1, 2, 6 are lost due to reodering and timeout.
// 7 just timed out
EXPECT_CALL(
ib,
packetLossDetected(Field(
&InstrumentationObserver::ObserverLossEvent::lostPackets,
UnorderedElementsAre(
getLossPacketMatcher(true, true),
getLossPacketMatcher(true, true),
getLossPacketMatcher(true, true),
getLossPacketMatcher(false, true)))))
.Times(1);
for (auto& callback : conn->pendingCallbacks) {
callback();
}
}
TEST_F(QuicLossFunctionsTest, TestNoInstrumentationObserverCallback) {
auto conn = createConn();
auto noopLossVisitor = [](auto&, auto&, bool) {};
PacketNum largestSent = 0;
for (int i = 0; i < 7; ++i) {
largestSent =
sendPacket(*conn, TimePoint(i * 10ms), folly::none, PacketType::OneRtt);
}
// Some packets are already acked
conn->outstandings.packets.erase(
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData) + 2,
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData) + 5);
// setting a low reorder threshold
conn->lossState.reorderingThreshold = 1;
// setting time out parameters lower than the time at which detectLossPackets
// is called to make sure all packets timeout
conn->lossState.srtt = 400ms;
conn->lossState.lrtt = 350ms;
conn->transportSettings.timeReorderingThreshDividend = 1.0;
conn->transportSettings.timeReorderingThreshDivisor = 1.0;
TimePoint checkTime = TimePoint(500ms);
detectLossPackets(
*conn,
largestSent + 1,
noopLossVisitor,
checkTime,
PacketNumberSpace::AppData);
// expecting 0 callbacks to be queued
EXPECT_EQ(0, size(conn->pendingCallbacks));
}
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
QuicLossFunctionsTests, QuicLossFunctionsTests,
QuicLossFunctionsTest, QuicLossFunctionsTest,

View File

@@ -0,0 +1,78 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#pragma once
#include <quic/codec/Types.h>
#include <quic/state/PacketEvent.h>
namespace quic {
// Data structure to represent outstanding retransmittable packets
struct OutstandingPacket {
// Structure representing the frames that are outstanding including the header
// that was sent.
RegularQuicWritePacket packet;
// Time that the packet was sent.
TimePoint time;
// Size of the packet sent on the wire.
uint32_t encodedSize;
// Whether this packet has any data from stream 0
bool isHandshake;
// Total sent bytes on this connection including this packet itself when this
// packet is sent.
uint64_t totalBytesSent;
// Information regarding the last acked packet on this connection when this
// packet is sent.
struct LastAckedPacketInfo {
TimePoint sentTime;
TimePoint ackTime;
// Total sent bytes on this connection when the last acked packet is acked.
uint64_t totalBytesSent;
// Total acked bytes on this connection when last acked packet is acked,
// including the last acked packet.
uint64_t totalBytesAcked;
LastAckedPacketInfo(
TimePoint sentTimeIn,
TimePoint ackTimeIn,
uint64_t totalBytesSentIn,
uint64_t totalBytesAckedIn)
: sentTime(sentTimeIn),
ackTime(ackTimeIn),
totalBytesSent(totalBytesSentIn),
totalBytesAcked(totalBytesAckedIn) {}
};
folly::Optional<LastAckedPacketInfo> lastAckedPacketInfo;
// PacketEvent associated with this OutstandingPacket. This will be a
// folly::none if the packet isn't a clone and hasn't been cloned.
folly::Optional<PacketEvent> associatedEvent;
/**
* Whether the packet is sent when congestion controller is in app-limited
* state.
*/
bool isAppLimited{false};
// True if spurious loss detection is enabled and this packet was declared
// lost.
bool declaredLost{false};
OutstandingPacket(
RegularQuicWritePacket packetIn,
TimePoint timeIn,
uint32_t encodedSizeIn,
bool isHandshakeIn,
uint64_t totalBytesSentIn)
: packet(std::move(packetIn)),
time(std::move(timeIn)),
encodedSize(encodedSizeIn),
isHandshake(isHandshakeIn),
totalBytesSent(totalBytesSentIn) {}
};
} // namespace quic

View File

@@ -9,6 +9,7 @@
#pragma once #pragma once
#include <quic/QuicConstants.h> #include <quic/QuicConstants.h>
#include <quic/api/Observer.h>
#include <quic/codec/ConnectionIdAlgo.h> #include <quic/codec/ConnectionIdAlgo.h>
#include <quic/codec/QuicReadCodec.h> #include <quic/codec/QuicReadCodec.h>
#include <quic/codec/QuicWriteCodec.h> #include <quic/codec/QuicWriteCodec.h>
@@ -18,6 +19,7 @@
#include <quic/handshake/HandshakeLayer.h> #include <quic/handshake/HandshakeLayer.h>
#include <quic/logging/QLogger.h> #include <quic/logging/QLogger.h>
#include <quic/state/AckStates.h> #include <quic/state/AckStates.h>
#include <quic/state/OutstandingPacket.h>
#include <quic/state/PacketEvent.h> #include <quic/state/PacketEvent.h>
#include <quic/state/PendingPathRateLimiter.h> #include <quic/state/PendingPathRateLimiter.h>
#include <quic/state/QuicStreamManager.h> #include <quic/state/QuicStreamManager.h>
@@ -103,70 +105,6 @@ struct NetworkDataSingle {
} }
}; };
// Data structure to represent outstanding retransmittable packets
struct OutstandingPacket {
// Structure representing the frames that are outstanding including the header
// that was sent.
RegularQuicWritePacket packet;
// Time that the packet was sent.
TimePoint time;
// Size of the packet sent on the wire.
uint32_t encodedSize;
// Whether this packet has any data from stream 0
bool isHandshake;
// Total sent bytes on this connection including this packet itself when this
// packet is sent.
uint64_t totalBytesSent;
// Information regarding the last acked packet on this connection when this
// packet is sent.
struct LastAckedPacketInfo {
TimePoint sentTime;
TimePoint ackTime;
// Total sent bytes on this connection when the last acked packet is acked.
uint64_t totalBytesSent;
// Total acked bytes on this connection when last acked packet is acked,
// including the last acked packet.
uint64_t totalBytesAcked;
LastAckedPacketInfo(
TimePoint sentTimeIn,
TimePoint ackTimeIn,
uint64_t totalBytesSentIn,
uint64_t totalBytesAckedIn)
: sentTime(sentTimeIn),
ackTime(ackTimeIn),
totalBytesSent(totalBytesSentIn),
totalBytesAcked(totalBytesAckedIn) {}
};
folly::Optional<LastAckedPacketInfo> lastAckedPacketInfo;
// PacketEvent associated with this OutstandingPacket. This will be a
// folly::none if the packet isn't a clone and hasn't been cloned.
folly::Optional<PacketEvent> associatedEvent;
/**
* Whether the packet is sent when congestion controller is in app-limited
* state.
*/
bool isAppLimited{false};
// True if spurious loss detection is enabled and this packet was declared
// lost.
bool declaredLost{false};
OutstandingPacket(
RegularQuicWritePacket packetIn,
TimePoint timeIn,
uint32_t encodedSizeIn,
bool isHandshakeIn,
uint64_t totalBytesSentIn)
: packet(std::move(packetIn)),
time(std::move(timeIn)),
encodedSize(encodedSizeIn),
isHandshake(isHandshakeIn),
totalBytesSent(totalBytesSentIn) {}
};
struct OutstandingsInfo { struct OutstandingsInfo {
// Sent packets which have not been acked. These are sorted by PacketNum. // Sent packets which have not been acked. These are sorted by PacketNum.
std::deque<OutstandingPacket> packets; std::deque<OutstandingPacket> packets;
@@ -852,6 +790,12 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
* Return true if replacement succeeds. * Return true if replacement succeeds.
*/ */
bool retireAndSwitchPeerConnectionIds(); bool retireAndSwitchPeerConnectionIds();
// instrumentation observers
InstrumentationObserverVec instrumentationObservers_;
// queue of functions to be called in processCallbacksAfterNetworkData
std::vector<std::function<void()>> pendingCallbacks;
}; };
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);