1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Use ByteEvent as a key in maps and sets

Summary:
This commit demonstrates the use of ByteEvents in maps. Intentionally didn't define the custom comparator and hash function inside the quic library because we want consumers of ByteEvents to define equality of 2 ByteEvents in their own way.
This will help recipients of ByteEventCallback keep track of pending,
received and cancelled ByteEvents easily.

In addition, the behavior of registerByteEventCallback was modified to NOT allow duplicate registrations. A pair of registrations is considered duplicate if they both have the same stream ID, stream offset, ByteEvent type and recipient callback pointer. In such cases, registerByteEventCallback returns an INVALID_OPERATION error.
This is critical to make sure that ByteEvents work well in maps, which may be used by recipients.

Reviewed By: bschlinker

Differential Revision: D27100623

fbshipit-source-id: 084a4765fa6c98fdc1b98414fbd30582cf1e5139
This commit is contained in:
Sridhar Srinivasan
2021-03-30 19:51:10 -07:00
committed by Facebook GitHub Bot
parent dfc9475107
commit ac468ad891
4 changed files with 236 additions and 6 deletions

View File

@@ -806,7 +806,8 @@ class QuicSocket {
Type type; Type type;
// sRTT at time of event // sRTT at time of event
// TODO(bschlinker): Deprecate, caller can fetch transport state if desired. // TODO(bschlinker): Deprecate, caller can fetch transport state if
// desired.
std::chrono::microseconds srtt{0us}; std::chrono::microseconds srtt{0us};
}; };

View File

@@ -2095,6 +2095,15 @@ QuicTransportBase::registerByteEventCallback(
byteEventMapIt->second.end(), byteEventMapIt->second.end(),
offset, offset,
[&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; }); [&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; });
if (pos != byteEventMapIt->second.begin()) {
auto prev = std::prev(pos);
if ((prev->offset == offset) && (prev->callback == cb)) {
// ByteEvent has been already registered for the same type, id,
// offset and for the same recipient, return an INVALID_OPERATION error
// to prevent duplicate registrations.
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
}
byteEventMapIt->second.emplace(pos, offset, cb); byteEventMapIt->second.emplace(pos, offset, cb);
} }
auto stream = conn_->streamManager->getStream(id); auto stream = conn_->streamManager->getStream(id);

View File

@@ -30,6 +30,7 @@ namespace test {
constexpr uint8_t kStreamIncrement = 0x04; constexpr uint8_t kStreamIncrement = 0x04;
using ByteEvent = QuicTransportBase::ByteEvent; using ByteEvent = QuicTransportBase::ByteEvent;
using ByteEventCancellation = QuicTransportBase::ByteEventCancellation;
enum class TestFrameType : uint8_t { enum class TestFrameType : uint8_t {
STREAM, STREAM,
@@ -111,6 +112,61 @@ class TestPingCallback : public QuicSocket::PingCallback {
void pingTimeout() noexcept override {} void pingTimeout() noexcept override {}
}; };
class TestByteEventCallback : public QuicSocket::ByteEventCallback {
public:
using HashFn = std::function<size_t(const ByteEvent&)>;
using ComparatorFn = std::function<bool(const ByteEvent&, const ByteEvent&)>;
enum class Status { REGISTERED = 1, RECEIVED = 2, CANCELLED = 3 };
void onByteEventRegistered(ByteEvent event) override {
EXPECT_TRUE(byteEventTracker_.find(event) == byteEventTracker_.end());
byteEventTracker_[event] = Status::REGISTERED;
}
void onByteEvent(ByteEvent event) override {
EXPECT_TRUE(byteEventTracker_.find(event) != byteEventTracker_.end());
byteEventTracker_[event] = Status::RECEIVED;
}
void onByteEventCanceled(ByteEventCancellation cancellation) override {
const ByteEvent& event = cancellation;
EXPECT_TRUE(byteEventTracker_.find(event) != byteEventTracker_.end());
byteEventTracker_[event] = Status::CANCELLED;
}
std::unordered_map<ByteEvent, Status, HashFn, ComparatorFn>
getByteEventTracker() const {
return byteEventTracker_;
}
private:
// Custom hash and comparator functions that use only id, offset and types
// (not the srtt)
HashFn hash = [](const ByteEvent& e) {
return folly::hash::hash_combine(e.id, e.offset, e.type);
};
ComparatorFn comparator = [](const ByteEvent& lhs, const ByteEvent& rhs) {
return ((lhs.id == rhs.id) && (lhs.offset == rhs.offset));
};
std::unordered_map<ByteEvent, Status, HashFn, ComparatorFn> byteEventTracker_{
/* bucket count */ 4,
hash,
comparator};
};
static auto
getByteEventMatcher(ByteEvent::Type type, StreamId id, uint64_t offset) {
return AllOf(
testing::Field(&ByteEvent::type, testing::Eq(type)),
testing::Field(&ByteEvent::id, testing::Eq(id)),
testing::Field(&ByteEvent::offset, testing::Eq(offset)));
}
static auto getByteEventTrackerMatcher(
ByteEvent event,
TestByteEventCallback::Status status) {
return Pair(getByteEventMatcher(event.type, event.id, event.offset), status);
}
class TestQuicTransport class TestQuicTransport
: public QuicTransportBase, : public QuicTransportBase,
public std::enable_shared_from_this<TestQuicTransport> { public std::enable_shared_from_this<TestQuicTransport> {
@@ -405,6 +461,7 @@ class QuicTransportImplTest : public Test {
protected: protected:
std::unique_ptr<folly::EventBase> evb; std::unique_ptr<folly::EventBase> evb;
NiceMock<MockConnectionCallback> connCallback; NiceMock<MockConnectionCallback> connCallback;
TestByteEventCallback byteEventCallback;
std::shared_ptr<TestQuicTransport> transport; std::shared_ptr<TestQuicTransport> transport;
folly::test::MockAsyncUDPSocket* socketPtr; folly::test::MockAsyncUDPSocket* socketPtr;
}; };
@@ -1354,6 +1411,166 @@ TEST_F(QuicTransportImplTest, DeliveryCallbackUnsetOne) {
transport->close(folly::none); transport->close(folly::none);
} }
TEST_F(QuicTransportImplTest, ByteEventCallbacksManagementSingleStream) {
auto stream = transport->createBidirectionalStream().value();
uint64_t offset1 = 10, offset2 = 20;
ByteEvent txEvent1 = {
.id = stream, .offset = offset1, .type = ByteEvent::Type::TX};
ByteEvent txEvent2 = {
.id = stream, .offset = offset2, .type = ByteEvent::Type::TX};
ByteEvent ackEvent1 = {
.id = stream, .offset = offset1, .type = ByteEvent::Type::ACK};
ByteEvent ackEvent2 = {
.id = stream, .offset = offset2, .type = ByteEvent::Type::ACK};
// Register 2 TX and 2 ACK events for the same stream at 2 different offsets
transport->registerTxCallback(
txEvent1.id, txEvent1.offset, &byteEventCallback);
transport->registerTxCallback(
txEvent2.id, txEvent2.offset, &byteEventCallback);
transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::REGISTERED)));
// Registering the same events a second time will result in an error.
// as double registrations are not allowed.
folly::Expected<folly::Unit, LocalErrorCode> ret;
ret = transport->registerTxCallback(
txEvent1.id, txEvent1.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerTxCallback(
txEvent2.id, txEvent2.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::REGISTERED)));
// On the ACK events, the transport usually sets the srtt value. This value
// should have NO EFFECT on the ByteEvent's hash and we still should be able
// to identify the previously registered byte event correctly.
ackEvent1.srtt = std::chrono::microseconds(1000);
ackEvent2.srtt = std::chrono::microseconds(2000);
// Deliver 1 TX and 1 ACK event. Cancel the other TX anc ACK event
byteEventCallback.onByteEvent(txEvent1);
byteEventCallback.onByteEvent(ackEvent2);
byteEventCallback.onByteEventCanceled(txEvent2);
byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent1);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::CANCELLED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::CANCELLED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::RECEIVED)));
}
TEST_F(QuicTransportImplTest, ByteEventCallbacksManagementDifferentStreams) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
ByteEvent txEvent1 = {
.id = stream1, .offset = 10, .type = ByteEvent::Type::TX};
ByteEvent txEvent2 = {
.id = stream2, .offset = 20, .type = ByteEvent::Type::TX};
ByteEvent ackEvent1 = {
.id = stream1, .offset = 10, .type = ByteEvent::Type::ACK};
ByteEvent ackEvent2 = {
.id = stream2, .offset = 20, .type = ByteEvent::Type::ACK};
EXPECT_THAT(byteEventCallback.getByteEventTracker(), IsEmpty());
// Register 2 TX and 2 ACK events for 2 separate streams.
transport->registerTxCallback(
txEvent1.id, txEvent1.offset, &byteEventCallback);
transport->registerTxCallback(
txEvent2.id, txEvent2.offset, &byteEventCallback);
transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::REGISTERED)));
// On the ACK events, the transport usually sets the srtt value. This value
// should have NO EFFECT on the ByteEvent's hash and we should still be able
// to identify the previously registered byte event correctly.
ackEvent1.srtt = std::chrono::microseconds(1000);
ackEvent2.srtt = std::chrono::microseconds(2000);
// Deliver the TX event for stream 1 and cancel the ACK event for stream 2
byteEventCallback.onByteEvent(txEvent1);
byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent2);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::CANCELLED)));
// Deliver the TX event for stream 2 and cancel the ACK event for stream 1
byteEventCallback.onByteEvent(txEvent2);
byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent1);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::CANCELLED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::CANCELLED)));
}
TEST_F(QuicTransportImplTest, RegisterTxDeliveryCallbackLowerThanExpected) { TEST_F(QuicTransportImplTest, RegisterTxDeliveryCallbackLowerThanExpected) {
auto stream = transport->createBidirectionalStream().value(); auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1; StrictMock<MockByteEventCallback> txcb1;

View File

@@ -2240,17 +2240,20 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksSingleByte) {
Mock::VerifyAndClearExpectations(&firstByteTxCb); Mock::VerifyAndClearExpectations(&firstByteTxCb);
Mock::VerifyAndClearExpectations(&lastByteTxCb); Mock::VerifyAndClearExpectations(&lastByteTxCb);
// even if we register pastlastByte again, it shouldn't be triggered // Even if we register pastlastByte again, it shouldn't trigger
// onByteEventRegistered because this is a duplicate registration.
EXPECT_CALL(pastlastByteTxCb, onByteEventRegistered(getTxMatcher(stream, 1))) EXPECT_CALL(pastlastByteTxCb, onByteEventRegistered(getTxMatcher(stream, 1)))
.Times(1); .Times(0);
transport_->registerTxCallback(stream, 1, &pastlastByteTxCb); auto ret = transport_->registerTxCallback(stream, 1, &pastlastByteTxCb);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
Mock::VerifyAndClearExpectations(&pastlastByteTxCb); Mock::VerifyAndClearExpectations(&pastlastByteTxCb);
// pastlastByteTxCb::onByteEvent will never get called // pastlastByteTxCb::onByteEvent will never get called
// cancel gets called instead // cancel gets called instead
// onByteEventCanceled called twice, since added twice // Even though we attempted to register the ByteEvent twice, it resulted in
// an error. So, onByteEventCanceled should be called only once.
EXPECT_CALL(pastlastByteTxCb, onByteEventCanceled(getTxMatcher(stream, 1))) EXPECT_CALL(pastlastByteTxCb, onByteEventCanceled(getTxMatcher(stream, 1)))
.Times(2); .Times(1);
transport_->close(folly::none); transport_->close(folly::none);
Mock::VerifyAndClearExpectations(&pastlastByteTxCb); Mock::VerifyAndClearExpectations(&pastlastByteTxCb);
} }