mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-06 22:22:38 +03:00
Exposing quicSocket to observers (#185)
Summary: Giving more access to the current state of the connection by exposing the socket. Pull Request resolved: https://github.com/facebookincubator/mvfst/pull/185 Reviewed By: yangchi Differential Revision: D23924861 Pulled By: bschlinker fbshipit-source-id: 16866a93e58b6544e25d474a51ca17fab9ffdc55
This commit is contained in:
committed by
Facebook GitHub Bot
parent
f16d60e922
commit
185a6730b6
@@ -93,18 +93,23 @@ class InstrumentationObserver {
|
|||||||
/**
|
/**
|
||||||
* packetLossDetected() is invoked when a packet loss is detected.
|
* packetLossDetected() is invoked when a packet loss is detected.
|
||||||
*
|
*
|
||||||
|
* @param socket Socket when the callback is processed.
|
||||||
* @param packet const reference to the packet that was determined to be
|
* @param packet const reference to the packet that was determined to be
|
||||||
* lost.
|
* lost.
|
||||||
*/
|
*/
|
||||||
virtual void packetLossDetected(
|
virtual void packetLossDetected(
|
||||||
|
QuicSocket*, /* socket */
|
||||||
const struct ObserverLossEvent& /* lossEvent */) {}
|
const struct ObserverLossEvent& /* lossEvent */) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* rttSampleGenerated() is invoked when a RTT sample is made.
|
* rttSampleGenerated() is invoked when a RTT sample is made.
|
||||||
*
|
*
|
||||||
* @param packet const reference to the packet with the RTT
|
* @param socket Socket when the callback is processed.
|
||||||
|
* @param packet const reference to the packet with the RTT.
|
||||||
*/
|
*/
|
||||||
virtual void rttSampleGenerated(const PacketRTT& /* RTT sample */) {}
|
virtual void rttSampleGenerated(
|
||||||
|
QuicSocket*, /* socket */
|
||||||
|
const PacketRTT& /* RTT sample */) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Container for instrumentation observers.
|
// Container for instrumentation observers.
|
||||||
|
@@ -1662,7 +1662,7 @@ void QuicTransportBase::processCallbacksAfterNetworkData() {
|
|||||||
|
|
||||||
// to call any callbacks added for observers
|
// to call any callbacks added for observers
|
||||||
for (const auto& callback : conn_->pendingCallbacks) {
|
for (const auto& callback : conn_->pendingCallbacks) {
|
||||||
callback();
|
callback(this);
|
||||||
}
|
}
|
||||||
conn_->pendingCallbacks.clear();
|
conn_->pendingCallbacks.clear();
|
||||||
|
|
||||||
|
@@ -326,13 +326,18 @@ class MockInstrumentationObserver : public InstrumentationObserver {
|
|||||||
public:
|
public:
|
||||||
GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*));
|
GMOCK_METHOD1_(, noexcept, , observerDetach, void(QuicSocket*));
|
||||||
GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*));
|
GMOCK_METHOD1_(, noexcept, , appRateLimited, void(QuicSocket*));
|
||||||
GMOCK_METHOD1_(
|
GMOCK_METHOD2_(
|
||||||
,
|
,
|
||||||
noexcept,
|
noexcept,
|
||||||
,
|
,
|
||||||
packetLossDetected,
|
packetLossDetected,
|
||||||
void(const ObserverLossEvent&));
|
void(QuicSocket*, const ObserverLossEvent&));
|
||||||
GMOCK_METHOD1_(, noexcept, , rttSampleGenerated, void(const PacketRTT&));
|
GMOCK_METHOD2_(
|
||||||
|
,
|
||||||
|
noexcept,
|
||||||
|
,
|
||||||
|
rttSampleGenerated,
|
||||||
|
void(QuicSocket*, const PacketRTT&));
|
||||||
|
|
||||||
static auto getLossPacketMatcher(bool reorderLoss, bool timeoutLoss) {
|
static auto getLossPacketMatcher(bool reorderLoss, bool timeoutLoss) {
|
||||||
return AllOf(
|
return AllOf(
|
||||||
|
@@ -3657,7 +3657,7 @@ TEST_F(
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksDeleted) {
|
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksDeleted) {
|
||||||
auto noopCallback = [] {};
|
auto noopCallback = [](QuicSocket*) {};
|
||||||
transport->transportConn->pendingCallbacks.emplace_back(noopCallback);
|
transport->transportConn->pendingCallbacks.emplace_back(noopCallback);
|
||||||
EXPECT_EQ(1, size(transport->transportConn->pendingCallbacks));
|
EXPECT_EQ(1, size(transport->transportConn->pendingCallbacks));
|
||||||
transport->invokeProcessCallbacksAfterNetworkData();
|
transport->invokeProcessCallbacksAfterNetworkData();
|
||||||
@@ -3666,7 +3666,7 @@ TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksDeleted) {
|
|||||||
|
|
||||||
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksInvoked) {
|
TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksInvoked) {
|
||||||
uint32_t callbacksInvoked = 0;
|
uint32_t callbacksInvoked = 0;
|
||||||
auto countingCallback = [&]() { callbacksInvoked++; };
|
auto countingCallback = [&](QuicSocket*) { callbacksInvoked++; };
|
||||||
|
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < 2; i++) {
|
||||||
transport->transportConn->pendingCallbacks.emplace_back(countingCallback);
|
transport->transportConn->pendingCallbacks.emplace_back(countingCallback);
|
||||||
@@ -3678,5 +3678,22 @@ TEST_F(QuicTransportImplTest, ImplementationObserverCallbacksInvoked) {
|
|||||||
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
|
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(
|
||||||
|
QuicTransportImplTest,
|
||||||
|
ImplementationObserverCallbacksCorrectQuicSocket) {
|
||||||
|
QuicSocket* returnedSocket = nullptr;
|
||||||
|
auto func = [&](QuicSocket* qSocket) { returnedSocket = qSocket; };
|
||||||
|
auto ib = MockInstrumentationObserver();
|
||||||
|
|
||||||
|
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
|
||||||
|
transport->transportConn->pendingCallbacks.emplace_back(func);
|
||||||
|
EXPECT_EQ(1, size(transport->transportConn->pendingCallbacks));
|
||||||
|
|
||||||
|
transport->invokeProcessCallbacksAfterNetworkData();
|
||||||
|
EXPECT_EQ(0, size(transport->transportConn->pendingCallbacks));
|
||||||
|
|
||||||
|
EXPECT_EQ(transport.get(), returnedSocket);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace test
|
} // namespace test
|
||||||
} // namespace quic
|
} // namespace quic
|
||||||
|
@@ -291,9 +291,10 @@ folly::Optional<CongestionController::LossEvent> detectLossPackets(
|
|||||||
// if there are observers, enqueue a function to call it
|
// if there are observers, enqueue a function to call it
|
||||||
if (observerLossEvent.hasPackets()) {
|
if (observerLossEvent.hasPackets()) {
|
||||||
for (const auto& observer : conn.instrumentationObservers_) {
|
for (const auto& observer : conn.instrumentationObservers_) {
|
||||||
conn.pendingCallbacks.emplace_back([observer, observerLossEvent] {
|
conn.pendingCallbacks.emplace_back(
|
||||||
observer->packetLossDetected(observerLossEvent);
|
[observer, observerLossEvent](QuicSocket* qSocket) {
|
||||||
});
|
observer->packetLossDetected(qSocket, observerLossEvent);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1922,16 +1922,18 @@ TEST_F(QuicLossFunctionsTest, TestReorderLossObserverCallback) {
|
|||||||
// 1, 2 and 6 are "lost" due to reodering. None lost due to timeout
|
// 1, 2 and 6 are "lost" due to reodering. None lost due to timeout
|
||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
ib,
|
ib,
|
||||||
packetLossDetected(Field(
|
packetLossDetected(
|
||||||
&InstrumentationObserver::ObserverLossEvent::lostPackets,
|
nullptr,
|
||||||
UnorderedElementsAre(
|
Field(
|
||||||
getLossPacketMatcher(true, false),
|
&InstrumentationObserver::ObserverLossEvent::lostPackets,
|
||||||
getLossPacketMatcher(true, false),
|
UnorderedElementsAre(
|
||||||
getLossPacketMatcher(true, false)))))
|
getLossPacketMatcher(true, false),
|
||||||
|
getLossPacketMatcher(true, false),
|
||||||
|
getLossPacketMatcher(true, false)))))
|
||||||
.Times(1);
|
.Times(1);
|
||||||
|
|
||||||
for (auto& callback : conn->pendingCallbacks) {
|
for (auto& callback : conn->pendingCallbacks) {
|
||||||
callback();
|
callback(nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1973,20 +1975,22 @@ TEST_F(QuicLossFunctionsTest, TestTimeoutLossObserverCallback) {
|
|||||||
// expecting all packets to be lost due to timeout
|
// expecting all packets to be lost due to timeout
|
||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
ib,
|
ib,
|
||||||
packetLossDetected(Field(
|
packetLossDetected(
|
||||||
&InstrumentationObserver::ObserverLossEvent::lostPackets,
|
nullptr,
|
||||||
UnorderedElementsAre(
|
Field(
|
||||||
getLossPacketMatcher(false, true),
|
&InstrumentationObserver::ObserverLossEvent::lostPackets,
|
||||||
getLossPacketMatcher(false, true),
|
UnorderedElementsAre(
|
||||||
getLossPacketMatcher(false, true),
|
getLossPacketMatcher(false, true),
|
||||||
getLossPacketMatcher(false, true),
|
getLossPacketMatcher(false, true),
|
||||||
getLossPacketMatcher(false, true),
|
getLossPacketMatcher(false, true),
|
||||||
getLossPacketMatcher(false, true),
|
getLossPacketMatcher(false, true),
|
||||||
getLossPacketMatcher(false, true)))))
|
getLossPacketMatcher(false, true),
|
||||||
|
getLossPacketMatcher(false, true),
|
||||||
|
getLossPacketMatcher(false, true)))))
|
||||||
.Times(1);
|
.Times(1);
|
||||||
|
|
||||||
for (auto& callback : conn->pendingCallbacks) {
|
for (auto& callback : conn->pendingCallbacks) {
|
||||||
callback();
|
callback(nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2034,17 +2038,19 @@ TEST_F(QuicLossFunctionsTest, TestTimeoutAndReorderLossObserverCallback) {
|
|||||||
// 7 just timed out
|
// 7 just timed out
|
||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
ib,
|
ib,
|
||||||
packetLossDetected(Field(
|
packetLossDetected(
|
||||||
&InstrumentationObserver::ObserverLossEvent::lostPackets,
|
nullptr,
|
||||||
UnorderedElementsAre(
|
Field(
|
||||||
getLossPacketMatcher(true, true),
|
&InstrumentationObserver::ObserverLossEvent::lostPackets,
|
||||||
getLossPacketMatcher(true, true),
|
UnorderedElementsAre(
|
||||||
getLossPacketMatcher(true, true),
|
getLossPacketMatcher(true, true),
|
||||||
getLossPacketMatcher(false, true)))))
|
getLossPacketMatcher(true, true),
|
||||||
|
getLossPacketMatcher(true, true),
|
||||||
|
getLossPacketMatcher(false, true)))))
|
||||||
.Times(1);
|
.Times(1);
|
||||||
|
|
||||||
for (auto& callback : conn->pendingCallbacks) {
|
for (auto& callback : conn->pendingCallbacks) {
|
||||||
callback();
|
callback(nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -147,9 +147,10 @@ void processAckFrame(
|
|||||||
InstrumentationObserver::PacketRTT packetRTT(
|
InstrumentationObserver::PacketRTT packetRTT(
|
||||||
ackReceiveTimeOrNow, rttSample, frame.ackDelay, *rPacketIt);
|
ackReceiveTimeOrNow, rttSample, frame.ackDelay, *rPacketIt);
|
||||||
for (const auto& observer : conn.instrumentationObservers_) {
|
for (const auto& observer : conn.instrumentationObservers_) {
|
||||||
conn.pendingCallbacks.emplace_back([observer, packetRTT] {
|
conn.pendingCallbacks.emplace_back(
|
||||||
observer->rttSampleGenerated(packetRTT);
|
[observer, packetRTT](QuicSocket* qSocket) {
|
||||||
});
|
observer->rttSampleGenerated(qSocket, packetRTT);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
updateRtt(conn, rttSample, frame.ackDelay);
|
updateRtt(conn, rttSample, frame.ackDelay);
|
||||||
}
|
}
|
||||||
|
@@ -907,7 +907,7 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
|
|||||||
InstrumentationObserverVec instrumentationObservers_;
|
InstrumentationObserverVec instrumentationObservers_;
|
||||||
|
|
||||||
// queue of functions to be called in processCallbacksAfterNetworkData
|
// queue of functions to be called in processCallbacksAfterNetworkData
|
||||||
std::vector<std::function<void()>> pendingCallbacks;
|
std::vector<std::function<void(QuicSocket*)>> pendingCallbacks;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);
|
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);
|
||||||
|
@@ -1203,22 +1203,26 @@ TEST_P(AckHandlersTest, TestRTTPacketObserverCallback) {
|
|||||||
ackData.ackTime - packetRcvTime[ackData.endSeq]);
|
ackData.ackTime - packetRcvTime[ackData.endSeq]);
|
||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
ib,
|
ib,
|
||||||
rttSampleGenerated(AllOf(
|
rttSampleGenerated(
|
||||||
Field(
|
nullptr,
|
||||||
&InstrumentationObserver::PacketRTT::rcvTime, ackData.ackTime),
|
AllOf(
|
||||||
Field(&InstrumentationObserver::PacketRTT::rttSample, rttSample),
|
|
||||||
Field(
|
|
||||||
&InstrumentationObserver::PacketRTT::ackDelay,
|
|
||||||
ackData.ackDelay),
|
|
||||||
Field(
|
|
||||||
&InstrumentationObserver::PacketRTT::metadata,
|
|
||||||
Field(
|
Field(
|
||||||
&quic::OutstandingPacketMetadata::inflightBytes,
|
&InstrumentationObserver::PacketRTT::rcvTime,
|
||||||
ackData.endSeq + 1)))));
|
ackData.ackTime),
|
||||||
|
Field(
|
||||||
|
&InstrumentationObserver::PacketRTT::rttSample, rttSample),
|
||||||
|
Field(
|
||||||
|
&InstrumentationObserver::PacketRTT::ackDelay,
|
||||||
|
ackData.ackDelay),
|
||||||
|
Field(
|
||||||
|
&InstrumentationObserver::PacketRTT::metadata,
|
||||||
|
Field(
|
||||||
|
&quic::OutstandingPacketMetadata::inflightBytes,
|
||||||
|
ackData.endSeq + 1)))));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto& callback : conn.pendingCallbacks) {
|
for (auto& callback : conn.pendingCallbacks) {
|
||||||
callback();
|
callback(nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user