1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Fix bug in registerByteEvent

Summary:
When registerByteEvent is called for a particular stream ID and offset, it is
possible that the byte event has already occurred in the past. In this case, we
schedule a lambda to be executed in the *next* iteration of the event loop, to
deliver the byte event to the callback.

There are 2 bugs in the current implementation of runEvbAsync which executes
the scheduled lambda.
1. It is possible that multiple recipients register for the same byte event on
the same stream ID and offset. If one of the recipients is already delivered
the byte event, then there is a possibility that the lambda can redeliver the
same notification a second time, which is not desirable.

2. There is a race condition where-in between the time the lambda was scheduled to
the time the lambda was excecuted, it is possible that the byte event gets
delivered when we process callbacks on receiving network data. In this
situation, there is a bug in the lambda where-in if there is another byte event
registered for any other offset for the same stream ID (regardless of the
callback), we will falsely deliver the byte event that was requested, resulting
in double delivery of the same byte event.

This commit fixes both bugs by checking that the requested byte event still
exists in the ByteEventDetail deque (comparing the offset as well as callback),
before erasing and delivering it.

Reviewed By: bschlinker

Differential Revision: D27846813

fbshipit-source-id: 099b79af2580fba5561ec33c4be45a8cf84c39e8
This commit is contained in:
Sridhar Srinivasan
2021-04-20 15:21:37 -07:00
committed by Facebook GitHub Bot
parent ece2008f26
commit 98b5e2aa8f
2 changed files with 290 additions and 6 deletions

View File

@@ -2107,8 +2107,13 @@ QuicTransportBase::registerByteEventCallback(
offset, offset,
[&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; }); [&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; });
if (pos != byteEventMapIt->second.begin()) { if (pos != byteEventMapIt->second.begin()) {
auto prev = std::prev(pos); auto matchingEvent = std::find_if(
if ((prev->offset == offset) && (prev->callback == cb)) { byteEventMapIt->second.begin(),
pos + 1,
[offset, cb](const ByteEventDetail& p) {
return ((p.offset == offset) && (p.callback == cb));
});
if (matchingEvent != pos + 1) {
// ByteEvent has been already registered for the same type, id, // ByteEvent has been already registered for the same type, id,
// offset and for the same recipient, return an INVALID_OPERATION error // offset and for the same recipient, return an INVALID_OPERATION error
// to prevent duplicate registrations. // to prevent duplicate registrations.
@@ -2148,11 +2153,23 @@ QuicTransportBase::registerByteEventCallback(
if (streamByteEventCbIt == byteEventMapL.end()) { if (streamByteEventCbIt == byteEventMapL.end()) {
return; return;
} }
auto pos = std::lower_bound(
// This is scheduled to run in the future (during the next iteration of
// the event loop). It is possible that the ByteEventDetail list gets
// mutated between the time it was scheduled to now when we are ready to
// run it. Look at the current outstanding ByteEvents for this stream ID
// and confirm that our ByteEvent's offset and recipient callback are
// still present.
auto pos = std::find_if(
streamByteEventCbIt->second.begin(), streamByteEventCbIt->second.begin(),
streamByteEventCbIt->second.end(), streamByteEventCbIt->second.end(),
offset, [offset, cb](const ByteEventDetail& p) {
[&](const ByteEventDetail& p, uint64_t o) { return p.offset < o; }); return ((p.offset == offset) && (p.callback == cb));
});
// if our byteEvent is not present, it must have been delivered already.
if (pos == streamByteEventCbIt->second.end()) {
return;
}
streamByteEventCbIt->second.erase(pos); streamByteEventCbIt->second.erase(pos);
ByteEvent byteEvent = {}; ByteEvent byteEvent = {};

View File

@@ -424,6 +424,33 @@ class TestQuicTransport
processCallbacksAfterNetworkData(); processCallbacksAfterNetworkData();
} }
// Simulates the delivery of a Byte Event callback, similar to the way it
// happens in QuicTransportBase::processCallbacksAfterNetworkData() or
// in the runOnEvbAsync lambda in
// QuicTransportBase::registerByteEventCallback()
bool deleteRegisteredByteEvent(
StreamId id,
uint64_t offset,
ByteEventCallback* cb,
ByteEvent::Type type) {
auto& byteEventMap = getByteEventMap(type);
auto streamByteEventCbIt = byteEventMap.find(id);
if (streamByteEventCbIt == byteEventMap.end()) {
return false;
}
auto pos = std::find_if(
streamByteEventCbIt->second.begin(),
streamByteEventCbIt->second.end(),
[offset, cb](const ByteEventDetail& p) {
return ((p.offset == offset) && (p.callback == cb));
});
if (pos == streamByteEventCbIt->second.end()) {
return false;
}
streamByteEventCbIt->second.erase(pos);
return true;
}
QuicServerConnectionState* transportConn; QuicServerConnectionState* transportConn;
std::unique_ptr<Aead> aead; std::unique_ptr<Aead> aead;
std::unique_ptr<PacketNumberCipher> headerCipher; std::unique_ptr<PacketNumberCipher> headerCipher;
@@ -460,6 +487,10 @@ class QuicTransportImplTest : public Test {
return MockByteEventCallback::getTxMatcher(id, offset); return MockByteEventCallback::getTxMatcher(id, offset);
} }
auto getAckMatcher(StreamId id, uint64_t offset) {
return MockByteEventCallback::getAckMatcher(id, offset);
}
protected: protected:
std::unique_ptr<folly::EventBase> evb; std::unique_ptr<folly::EventBase> evb;
NiceMock<MockConnectionCallback> connCallback; NiceMock<MockConnectionCallback> connCallback;
@@ -1641,6 +1672,241 @@ TEST_F(
Mock::VerifyAndClearExpectations(&dcb); Mock::VerifyAndClearExpectations(&dcb);
} }
TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRegistrationsTx) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset that is before the curernt write offset, they will both be
// scheduled for immediate delivery.
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3)));
transport->registerTxCallback(stream, 3, &txcb1);
transport->registerTxCallback(stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, re-register the same callbacks, it should not go through.
auto ret = transport->registerTxCallback(stream, 3, &txcb1);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerTxCallback(stream, 3, &txcb2);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
// Deliver the first set of registrations.
EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(1);
EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1);
evb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(
QuicTransportImplTest,
RegisterDeliveryCallbackMultipleRegistrationsAck) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset that is before the curernt write offset, they will both be
// scheduled for immediate delivery.
EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3)));
transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1);
transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, re-register the same callbacks, it should not go through.
auto ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb1);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb2);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
// Deliver the first set of registrations.
EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(1);
EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1);
evb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRecipientsTx) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset.
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3)));
transport->registerTxCallback(stream, 3, &txcb1);
transport->registerTxCallback(stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::TX);
CHECK_EQ(true, deleted);
// Only the callback for txcb2 should be outstanding now. Run the loop to
// confirm.
EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1);
evb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRecipientsAck) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset.
EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3)));
transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1);
transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::ACK);
CHECK_EQ(true, deleted);
// Only the callback for txcb2 should be outstanding now. Run the loop to
// confirm.
EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1);
evb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackAsyncDeliveryTx) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Register tx callbacks for the same stream at offsets 3 (before current
// write offset) and 10 (after current write offset).
// txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync)
// for immediate delivery. txcb2 (offset = 10) will be queued for delivery
// when the actual TX for this offset occurs in the future.
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 10)));
transport->registerTxCallback(stream, 3, &txcb1);
transport->registerTxCallback(stream, 10, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::TX);
CHECK_EQ(true, deleted);
// Only txcb2 (offset = 10) should be outstanding now. Run the loop.
// txcb1 (offset = 3) should not be delivered now because it is already
// delivered. txcb2 (offset = 10) should not be delivered because the
// current write offset (7) is still less than the offset requested (10)
EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 10))).Times(0);
evb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 10)));
transport->close(folly::none);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackAsyncDeliveryAck) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Register tx callbacks for the same stream at offsets 3 (before current
// write offset) and 10 (after current write offset).
// txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync)
// for immediate delivery. txcb2 (offset = 10) will be queued for delivery
// when the actual TX for this offset occurs in the future.
EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 10)));
transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1);
transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 10, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::ACK);
CHECK_EQ(true, deleted);
// Only txcb2 (offset = 10) should be outstanding now. Run the loop.
// txcb1 (offset = 3) should not be delivered now because it is already
// delivered. txcb2 (offset = 10) should not be delivered because the
// current write offset (7) is still less than the offset requested (10)
EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 10))).Times(0);
evb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
EXPECT_CALL(txcb2, onByteEventCanceled(getAckMatcher(stream, 10)));
transport->close(folly::none);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(QuicTransportImplTest, CancelAllByteEventCallbacks) { TEST_F(QuicTransportImplTest, CancelAllByteEventCallbacks) {
auto stream1 = transport->createBidirectionalStream().value(); auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value();
@@ -2299,7 +2565,8 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) {
// Close the last stream. // Close the last stream.
auto streamState = transport->transportConn->streamManager->getStream(stream); auto streamState = transport->transportConn->streamManager->getStream(stream);
// Fake that the data was TXed and delivered to keep all the state consistent. // Fake that the data was TXed and delivered to keep all the state
// consistent.
streamState->currentWriteOffset = 7; streamState->currentWriteOffset = 7;
transport->transportConn->streamManager->addTx(stream); transport->transportConn->streamManager->addTx(stream);
transport->transportConn->streamManager->addDeliverable(stream); transport->transportConn->streamManager->addDeliverable(stream);