diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 658b742e1..be641aede 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -2107,8 +2107,13 @@ QuicTransportBase::registerByteEventCallback( offset, [&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; }); if (pos != byteEventMapIt->second.begin()) { - auto prev = std::prev(pos); - if ((prev->offset == offset) && (prev->callback == cb)) { + auto matchingEvent = std::find_if( + byteEventMapIt->second.begin(), + pos + 1, + [offset, cb](const ByteEventDetail& p) { + return ((p.offset == offset) && (p.callback == cb)); + }); + if (matchingEvent != pos + 1) { // ByteEvent has been already registered for the same type, id, // offset and for the same recipient, return an INVALID_OPERATION error // to prevent duplicate registrations. @@ -2148,11 +2153,23 @@ QuicTransportBase::registerByteEventCallback( if (streamByteEventCbIt == byteEventMapL.end()) { return; } - auto pos = std::lower_bound( + + // This is scheduled to run in the future (during the next iteration of + // the event loop). It is possible that the ByteEventDetail list gets + // mutated between the time it was scheduled to now when we are ready to + // run it. Look at the current outstanding ByteEvents for this stream ID + // and confirm that our ByteEvent's offset and recipient callback are + // still present. + auto pos = std::find_if( streamByteEventCbIt->second.begin(), streamByteEventCbIt->second.end(), - offset, - [&](const ByteEventDetail& p, uint64_t o) { return p.offset < o; }); + [offset, cb](const ByteEventDetail& p) { + return ((p.offset == offset) && (p.callback == cb)); + }); + // if our byteEvent is not present, it must have been delivered already. + if (pos == streamByteEventCbIt->second.end()) { + return; + } streamByteEventCbIt->second.erase(pos); ByteEvent byteEvent = {}; diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index a1701518c..c734d90e7 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -424,6 +424,33 @@ class TestQuicTransport processCallbacksAfterNetworkData(); } + // Simulates the delivery of a Byte Event callback, similar to the way it + // happens in QuicTransportBase::processCallbacksAfterNetworkData() or + // in the runOnEvbAsync lambda in + // QuicTransportBase::registerByteEventCallback() + bool deleteRegisteredByteEvent( + StreamId id, + uint64_t offset, + ByteEventCallback* cb, + ByteEvent::Type type) { + auto& byteEventMap = getByteEventMap(type); + auto streamByteEventCbIt = byteEventMap.find(id); + if (streamByteEventCbIt == byteEventMap.end()) { + return false; + } + auto pos = std::find_if( + streamByteEventCbIt->second.begin(), + streamByteEventCbIt->second.end(), + [offset, cb](const ByteEventDetail& p) { + return ((p.offset == offset) && (p.callback == cb)); + }); + if (pos == streamByteEventCbIt->second.end()) { + return false; + } + streamByteEventCbIt->second.erase(pos); + return true; + } + QuicServerConnectionState* transportConn; std::unique_ptr aead; std::unique_ptr headerCipher; @@ -460,6 +487,10 @@ class QuicTransportImplTest : public Test { return MockByteEventCallback::getTxMatcher(id, offset); } + auto getAckMatcher(StreamId id, uint64_t offset) { + return MockByteEventCallback::getAckMatcher(id, offset); + } + protected: std::unique_ptr evb; NiceMock connCallback; @@ -1641,6 +1672,241 @@ TEST_F( Mock::VerifyAndClearExpectations(&dcb); } +TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRegistrationsTx) { + auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + + // Set the current write offset to 7. + auto streamState = transport->transportConn->streamManager->getStream(stream); + streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); + + // Have 2 different recipients register for a callback on the same stream ID + // and offset that is before the curernt write offset, they will both be + // scheduled for immediate delivery. + EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3))); + EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3))); + transport->registerTxCallback(stream, 3, &txcb1); + transport->registerTxCallback(stream, 3, &txcb2); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + // Now, re-register the same callbacks, it should not go through. + auto ret = transport->registerTxCallback(stream, 3, &txcb1); + EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error()); + ret = transport->registerTxCallback(stream, 3, &txcb2); + EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error()); + + // Deliver the first set of registrations. + EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(1); + EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1); + evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); +} + +TEST_F( + QuicTransportImplTest, + RegisterDeliveryCallbackMultipleRegistrationsAck) { + auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + + // Set the current write offset to 7. + auto streamState = transport->transportConn->streamManager->getStream(stream); + streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); + + // Have 2 different recipients register for a callback on the same stream ID + // and offset that is before the curernt write offset, they will both be + // scheduled for immediate delivery. + EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3))); + EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3))); + transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1); + transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb2); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + // Now, re-register the same callbacks, it should not go through. + auto ret = transport->registerByteEventCallback( + ByteEvent::Type::ACK, stream, 3, &txcb1); + EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error()); + ret = transport->registerByteEventCallback( + ByteEvent::Type::ACK, stream, 3, &txcb2); + EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error()); + + // Deliver the first set of registrations. + EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(1); + EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1); + evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); +} + +TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRecipientsTx) { + auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + + // Set the current write offset to 7. + auto streamState = transport->transportConn->streamManager->getStream(stream); + streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); + + // Have 2 different recipients register for a callback on the same stream ID + // and offset. + EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3))); + EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3))); + transport->registerTxCallback(stream, 3, &txcb1); + transport->registerTxCallback(stream, 3, &txcb2); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + // Now, *before* the runOnEvbAsync gets a chance to run, simulate the + // delivery of the callback for txcb1 (offset = 3) by deleting it from the + // outstanding callback queue for this stream ID. This is similar to what + // happens in processCallbacksAfterNetworkData. + bool deleted = transport->deleteRegisteredByteEvent( + stream, 3, &txcb1, ByteEvent::Type::TX); + CHECK_EQ(true, deleted); + + // Only the callback for txcb2 should be outstanding now. Run the loop to + // confirm. + EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0); + EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1); + evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); +} + +TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackMultipleRecipientsAck) { + auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + + // Set the current write offset to 7. + auto streamState = transport->transportConn->streamManager->getStream(stream); + streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); + + // Have 2 different recipients register for a callback on the same stream ID + // and offset. + EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3))); + EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3))); + transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1); + transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb2); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + // Now, *before* the runOnEvbAsync gets a chance to run, simulate the + // delivery of the callback for txcb1 (offset = 3) by deleting it from the + // outstanding callback queue for this stream ID. This is similar to what + // happens in processCallbacksAfterNetworkData. + bool deleted = transport->deleteRegisteredByteEvent( + stream, 3, &txcb1, ByteEvent::Type::ACK); + CHECK_EQ(true, deleted); + + // Only the callback for txcb2 should be outstanding now. Run the loop to + // confirm. + EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0); + EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1); + evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); +} + +TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackAsyncDeliveryTx) { + auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + + // Set the current write offset to 7. + auto streamState = transport->transportConn->streamManager->getStream(stream); + streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); + + // Register tx callbacks for the same stream at offsets 3 (before current + // write offset) and 10 (after current write offset). + // txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync) + // for immediate delivery. txcb2 (offset = 10) will be queued for delivery + // when the actual TX for this offset occurs in the future. + EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3))); + EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 10))); + transport->registerTxCallback(stream, 3, &txcb1); + transport->registerTxCallback(stream, 10, &txcb2); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + // Now, *before* the runOnEvbAsync gets a chance to run, simulate the + // delivery of the callback for txcb1 (offset = 3) by deleting it from the + // outstanding callback queue for this stream ID. This is similar to what + // happens in processCallbacksAfterNetworkData. + bool deleted = transport->deleteRegisteredByteEvent( + stream, 3, &txcb1, ByteEvent::Type::TX); + CHECK_EQ(true, deleted); + + // Only txcb2 (offset = 10) should be outstanding now. Run the loop. + // txcb1 (offset = 3) should not be delivered now because it is already + // delivered. txcb2 (offset = 10) should not be delivered because the + // current write offset (7) is still less than the offset requested (10) + EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0); + EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 10))).Times(0); + evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 10))); + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb2); +} + +TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackAsyncDeliveryAck) { + auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + + // Set the current write offset to 7. + auto streamState = transport->transportConn->streamManager->getStream(stream); + streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); + + // Register tx callbacks for the same stream at offsets 3 (before current + // write offset) and 10 (after current write offset). + // txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync) + // for immediate delivery. txcb2 (offset = 10) will be queued for delivery + // when the actual TX for this offset occurs in the future. + EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3))); + EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 10))); + transport->registerByteEventCallback(ByteEvent::Type::ACK, stream, 3, &txcb1); + transport->registerByteEventCallback( + ByteEvent::Type::ACK, stream, 10, &txcb2); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + // Now, *before* the runOnEvbAsync gets a chance to run, simulate the + // delivery of the callback for txcb1 (offset = 3) by deleting it from the + // outstanding callback queue for this stream ID. This is similar to what + // happens in processCallbacksAfterNetworkData. + bool deleted = transport->deleteRegisteredByteEvent( + stream, 3, &txcb1, ByteEvent::Type::ACK); + CHECK_EQ(true, deleted); + + // Only txcb2 (offset = 10) should be outstanding now. Run the loop. + // txcb1 (offset = 3) should not be delivered now because it is already + // delivered. txcb2 (offset = 10) should not be delivered because the + // current write offset (7) is still less than the offset requested (10) + EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0); + EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 10))).Times(0); + evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + + EXPECT_CALL(txcb2, onByteEventCanceled(getAckMatcher(stream, 10))); + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb2); +} + TEST_F(QuicTransportImplTest, CancelAllByteEventCallbacks) { auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); @@ -2299,7 +2565,8 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) { // Close the last stream. auto streamState = transport->transportConn->streamManager->getStream(stream); - // Fake that the data was TXed and delivered to keep all the state consistent. + // Fake that the data was TXed and delivered to keep all the state + // consistent. streamState->currentWriteOffset = 7; transport->transportConn->streamManager->addTx(stream); transport->transportConn->streamManager->addDeliverable(stream);