From b4df09831b8c0dc34bcfedee9cdf4723c5dabb34 Mon Sep 17 00:00:00 2001 From: Brandon Schlinker Date: Thu, 16 Jul 2020 22:44:15 -0700 Subject: [PATCH] Support for TX timestamping Summary: Adds support for timestamping on TX (TX byte events). This allows the application to determine when a byte that it previously wrote to the transport was put onto the wire. Callbacks are processed within a new function `QuicTransportBase::processCallbacksAfterWriteData`, which is invoked by `writeSocketDataAndCatch`. Reviewed By: mjoras Differential Revision: D22008855 fbshipit-source-id: 99c1697cb74bb2387dbad231611be58f9392c99f --- quic/api/CMakeLists.txt | 1 + quic/api/QuicSocket.cpp | 17 + quic/api/QuicSocket.h | 30 +- quic/api/QuicTransportBase.cpp | 139 +++- quic/api/QuicTransportBase.h | 49 ++ quic/api/QuicTransportFunctions.cpp | 1 + quic/api/test/MockQuicSocket.h | 18 +- quic/api/test/Mocks.h | 25 + quic/api/test/QuicTransportBaseTest.cpp | 654 ++++++++++++++++++- quic/api/test/QuicTransportTest.cpp | 412 ++++++++++++ quic/server/test/QuicServerTransportTest.cpp | 27 + quic/state/QuicStreamFunctions.cpp | 10 + quic/state/QuicStreamFunctions.h | 8 + quic/state/QuicStreamManager.cpp | 1 + quic/state/QuicStreamManager.h | 60 +- quic/state/test/QuicStreamFunctionsTest.cpp | 32 + 16 files changed, 1446 insertions(+), 38 deletions(-) create mode 100644 quic/api/QuicSocket.cpp diff --git a/quic/api/CMakeLists.txt b/quic/api/CMakeLists.txt index 321e9b6f8..f09670716 100644 --- a/quic/api/CMakeLists.txt +++ b/quic/api/CMakeLists.txt @@ -8,6 +8,7 @@ add_library( IoBufQuicBatch.cpp QuicBatchWriter.cpp QuicPacketScheduler.cpp + QuicSocket.cpp QuicStreamAsyncTransport.cpp QuicTransportBase.cpp QuicTransportFunctions.cpp diff --git a/quic/api/QuicSocket.cpp b/quic/api/QuicSocket.cpp new file mode 100644 index 000000000..f006c78cb --- /dev/null +++ b/quic/api/QuicSocket.cpp @@ -0,0 +1,17 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + */ + +#include + +namespace quic { + +// required for C++14 compatibility +constexpr std::array + QuicSocket::ByteEvent::kByteEventTypes; + +} // namespace quic diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index a1255ef24..edeff4891 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -790,7 +790,9 @@ class QuicSocket { * Structure used to communicate TX and ACK/Delivery notifications. */ struct ByteEvent { - enum class Type { ACK = 1 }; + enum class Type { ACK = 1, TX = 2 }; + static constexpr std::array kByteEventTypes = {Type::ACK, + Type::TX}; StreamId id{0}; uint64_t offset{0}; @@ -864,6 +866,19 @@ class QuicSocket { } }; + /** + * Register a callback to be invoked when the stream offset was transmitted. + * + * Currently, an offset is considered "transmitted" if it has been written to + * to the underlying UDP socket, indicating that it has passed through + * congestion control and pacing. In the future, this callback may be + * triggered by socket/NIC software or hardware timestamps. + */ + virtual folly::Expected registerTxCallback( + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) = 0; + /** * Register a byte event to be triggered when specified event type occurs for * the specified stream and offset. @@ -906,6 +921,19 @@ class QuicSocket { */ virtual void cancelByteEventCallbacks(const ByteEvent::Type type) = 0; + /** + * Get the number of pending byte events for the given stream. + */ + FOLLY_NODISCARD virtual size_t getNumByteEventCallbacksForStream( + const StreamId streamId) const = 0; + + /** + * Get the number of pending byte events of specified type for given stream. + */ + FOLLY_NODISCARD virtual size_t getNumByteEventCallbacksForStream( + const ByteEvent::Type type, + const StreamId streamId) const = 0; + /** * Write data/eof to the given stream. * diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 9ca1214c1..e4323d0fe 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1234,7 +1234,9 @@ void QuicTransportBase::cancelDeliveryCallbacksForStream( void QuicTransportBase::cancelByteEventCallbacksForStream( const StreamId id, const folly::Optional& offset) { - cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id, offset); + invokeForEachByteEventType(([this, id, &offset](const ByteEvent::Type type) { + cancelByteEventCallbacksForStream(type, id, offset); + })); } void QuicTransportBase::cancelByteEventCallbacksForStream( @@ -1252,6 +1254,9 @@ void QuicTransportBase::cancelByteEventCallbacksForStream( case ByteEvent::Type::ACK: conn_->streamManager->removeDeliverable(id); break; + case ByteEvent::Type::TX: + conn_->streamManager->removeTx(id); + break; } return; } @@ -1287,13 +1292,17 @@ void QuicTransportBase::cancelByteEventCallbacksForStream( case ByteEvent::Type::ACK: conn_->streamManager->removeDeliverable(id); break; + case ByteEvent::Type::TX: + conn_->streamManager->removeTx(id); + break; } byteEventMap.erase(byteEventMapIt); } } void QuicTransportBase::cancelAllByteEventCallbacks() { - cancelByteEventCallbacks(ByteEvent::Type::ACK); + invokeForEachByteEventType( + ([this](const ByteEvent::Type type) { cancelByteEventCallbacks(type); })); } void QuicTransportBase::cancelByteEventCallbacks(const ByteEvent::Type type) { @@ -1313,6 +1322,28 @@ void QuicTransportBase::cancelByteEventCallbacks(const ByteEvent::Type type) { } } +size_t QuicTransportBase::getNumByteEventCallbacksForStream( + const StreamId id) const { + size_t total = 0; + invokeForEachByteEventTypeConst( + ([this, id, &total](const ByteEvent::Type type) { + total += getNumByteEventCallbacksForStream(type, id); + })); + return total; +} + +size_t QuicTransportBase::getNumByteEventCallbacksForStream( + const ByteEvent::Type type, + const StreamId id) const { + const auto& byteEventMap = getByteEventMapConst(type); + const auto byteEventMapIt = byteEventMap.find(id); + if (byteEventMapIt == byteEventMap.end()) { + return 0; + } + const auto& streamByteEvents = byteEventMapIt->second; + return streamByteEvents.size(); +} + folly::Expected, LocalErrorCode> QuicTransportBase::read( StreamId id, size_t maxLen) { @@ -1497,6 +1528,65 @@ void QuicTransportBase::handlePingCallback() { conn_->pendingEvents.cancelPingTimeout = false; } +void QuicTransportBase::processCallbacksAfterWriteData() { + if (closeState_ != CloseState::OPEN) { + return; + } + + auto txStreamId = conn_->streamManager->popTx(); + while (txStreamId.has_value()) { + auto streamId = *txStreamId; + auto stream = conn_->streamManager->getStream(streamId); + auto largestOffsetTxed = getLargestWriteOffsetTxed(*stream); + // if it's in the set of streams with TX, we should have a valid offset + CHECK(largestOffsetTxed.has_value()); + + // lambda to help get the next callback to call for this stream + auto getNextTxCallbackForStreamAndCleanup = + [this, &largestOffsetTxed](const auto& streamId) + -> folly::Optional> { + auto txCallbacksForStreamIt = txCallbacks_.find(streamId); + if (txCallbacksForStreamIt == txCallbacks_.end() || + txCallbacksForStreamIt->second.empty()) { + return folly::none; + } + + auto& txCallbacksForStream = txCallbacksForStreamIt->second; + if (txCallbacksForStream.front().first > *largestOffsetTxed) { + return folly::none; + } + + // extract the callback, pop from the queue, then check for cleanup + auto result = txCallbacksForStream.front(); + txCallbacksForStream.pop_front(); + if (txCallbacksForStream.empty()) { + txCallbacks_.erase(txCallbacksForStreamIt); + } + return result; + }; + + folly::Optional> + nextOffsetAndCallback; + while ( + (nextOffsetAndCallback = + getNextTxCallbackForStreamAndCleanup(streamId))) { + ByteEvent byteEvent = {}; + byteEvent.id = streamId; + byteEvent.offset = nextOffsetAndCallback->first; + byteEvent.type = ByteEvent::Type::TX; + nextOffsetAndCallback->second->onByteEvent(byteEvent); + + // connection may be closed by callback + if (closeState_ != CloseState::OPEN) { + return; + } + } + + // pop the next stream + txStreamId = conn_->streamManager->popTx(); + } +} + void QuicTransportBase::processCallbacksAfterNetworkData() { if (closeState_ != CloseState::OPEN) { return; @@ -1981,6 +2071,14 @@ QuicTransportBase::registerDeliveryCallback( return registerByteEventCallback(ByteEvent::Type::ACK, id, offset, cb); } +folly::Expected +QuicTransportBase::registerTxCallback( + StreamId id, + uint64_t offset, + ByteEventCallback* cb) { + return registerByteEventCallback(ByteEvent::Type::TX, id, offset, cb); +} + folly::Expected QuicTransportBase::registerByteEventCallback( const ByteEvent::Type type, @@ -2021,14 +2119,17 @@ QuicTransportBase::registerByteEventCallback( } auto stream = conn_->streamManager->getStream(id); - folly::Optional nextOffsetToWait; + // if the callback is already ready, we still insert, but schedule to process + folly::Optional maxOffsetReady; switch (type) { case ByteEvent::Type::ACK: - nextOffsetToWait = getLargestDeliverableOffset(*stream); + maxOffsetReady = getLargestDeliverableOffset(*stream); + break; + case ByteEvent::Type::TX: + maxOffsetReady = getLargestWriteOffsetTxed(*stream); break; } - if (nextOffsetToWait.has_value() && (offset < *nextOffsetToWait)) { - // This byte event has already occurred + if (maxOffsetReady.has_value() && (offset <= *maxOffsetReady)) { runOnEvbAsync([id, cb, offset, type](auto selfObj) { if (selfObj->closeState_ != CloseState::OPEN) { // Close will error out all byte event callbacks. @@ -2150,12 +2251,11 @@ void QuicTransportBase::checkForClosedStream() { ++itr; continue; } - // We might be in the process of delivering all the delivery callbacks for - // the stream when we receive close stream. - auto deliveryCbCount = deliveryCallbacks_.count(*itr); - if (deliveryCbCount > 0) { - VLOG(10) << "Not closing stream=" << *itr - << " because it is waiting for the delivery callback"; + // If we have pending byte events, delay closing the stream + auto numByteEventCb = getNumByteEventCallbacksForStream(*itr); + if (numByteEventCb > 0) { + VLOG(10) << "Not closing stream=" << *itr << " because it has " + << numByteEventCb << " pending byte event callbacks"; ++itr; continue; } @@ -2542,6 +2642,7 @@ void QuicTransportBase::writeSocketDataAndCatch() { FOLLY_MAYBE_UNUSED auto self = sharedGuard(); try { writeSocketData(); + processCallbacksAfterWriteData(); } catch (const QuicTransportException& ex) { VLOG(4) << __func__ << ex.what() << " " << *this; exceptionCloseWhat_ = ex.what(); @@ -2777,9 +2878,23 @@ QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMap( switch (type) { case ByteEvent::Type::ACK: return deliveryCallbacks_; + case ByteEvent::Type::TX: + return txCallbacks_; } LOG(FATAL) << "Unhandled case in getByteEventMap"; folly::assume_unreachable(); } +const QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMapConst( + const ByteEvent::Type type) const { + switch (type) { + case ByteEvent::Type::ACK: + return deliveryCallbacks_; + case ByteEvent::Type::TX: + return txCallbacks_; + } + LOG(FATAL) << "Unhandled case in getByteEventMapConst"; + folly::assume_unreachable(); +} + } // namespace quic diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index cba698f90..162606c2a 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -313,6 +313,19 @@ class QuicTransportBase : public QuicSocket { */ void cancelDeliveryCallbacksForStream(StreamId id, uint64_t offset) override; + /** + * Register a callback to be invoked when the stream offset was transmitted. + * + * Currently, an offset is considered "transmitted" if it has been written to + * to the underlying UDP socket, indicating that it has passed through + * congestion control and pacing. In the future, this callback may be + * triggered by socket/NIC software or hardware timestamps. + */ + folly::Expected registerTxCallback( + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) override; + /** * Register a byte event to be triggered when specified event type occurs for * the specified stream and offset. @@ -354,6 +367,19 @@ class QuicTransportBase : public QuicSocket { */ void cancelByteEventCallbacks(const ByteEvent::Type type) override; + /** + * Get the number of pending byte events for the given stream. + */ + [[nodiscard]] size_t getNumByteEventCallbacksForStream( + const StreamId id) const override; + + /** + * Get the number of pending byte events of specified type for given stream. + */ + [[nodiscard]] size_t getNumByteEventCallbacksForStream( + const ByteEvent::Type type, + const StreamId id) const override; + // Timeout functions class LossTimeout : public folly::HHWheelTimer::Callback { public: @@ -556,6 +582,7 @@ class QuicTransportBase : public QuicSocket { getInstrumentationObservers() const override; protected: + void processCallbacksAfterWriteData(); void processCallbacksAfterNetworkData(); void invokeReadDataAndCallbacks(); void invokePeekDataAndCallbacks(); @@ -639,6 +666,27 @@ class QuicTransportBase : public QuicSocket { using ByteEventMap = folly:: F14FastMap>>; ByteEventMap& getByteEventMap(const ByteEvent::Type type); + [[nodiscard]] const ByteEventMap& getByteEventMapConst( + const ByteEvent::Type type) const; + + /** + * Helper function that calls passed function for each ByteEvent type. + * + * Removes number of locations to update when a byte event is added. + */ + void invokeForEachByteEventType( + const std::function& fn) { + for (const auto& type : ByteEvent::kByteEventTypes) { + fn(type); + } + } + + void invokeForEachByteEventTypeConst( + const std::function& fn) const { + for (const auto& type : ByteEvent::kByteEventTypes) { + fn(type); + } + } std::atomic evb_; std::unique_ptr socket_; @@ -681,6 +729,7 @@ class QuicTransportBase : public QuicSocket { folly::F14FastMap peekCallbacks_; ByteEventMap deliveryCallbacks_; + ByteEventMap txCallbacks_; folly::F14FastMap dataExpiredCallbacks_; folly::F14FastMap dataRejectedCallbacks_; diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 498604a93..9af350363 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -498,6 +498,7 @@ void updateConnection( updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len); maybeWriteBlockAfterSocketWrite(*stream); conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->addTx(writeStreamFrame.streamId); } conn.streamManager->updateLossStreams(*stream); break; diff --git a/quic/api/test/MockQuicSocket.h b/quic/api/test/MockQuicSocket.h index da46f349d..7722e39bc 100644 --- a/quic/api/test/MockQuicSocket.h +++ b/quic/api/test/MockQuicSocket.h @@ -140,13 +140,19 @@ class MockQuicSocket : public QuicSocket { MOCK_METHOD1( unregisterStreamWriteCallback, folly::Expected(StreamId)); + MOCK_METHOD3( + registerTxCallback, + folly::Expected( + const StreamId, + const uint64_t, + ByteEventCallback*)); MOCK_METHOD4( registerByteEventCallback, folly::Expected( const ByteEvent::Type, - const StreamId id, - const uint64_t offset, - ByteEventCallback* cb)); + const StreamId, + const uint64_t, + ByteEventCallback*)); MOCK_METHOD2( cancelByteEventCallbacksForStream, void(const StreamId id, const folly::Optional& offset)); @@ -158,6 +164,12 @@ class MockQuicSocket : public QuicSocket { const folly::Optional& offset)); MOCK_METHOD0(cancelAllByteEventCallbacks, void()); MOCK_METHOD1(cancelByteEventCallbacks, void(const ByteEvent::Type)); + MOCK_CONST_METHOD1( + getNumByteEventCallbacksForStream, + size_t(const StreamId id)); + MOCK_CONST_METHOD2( + getNumByteEventCallbacksForStream, + size_t(const ByteEvent::Type, const StreamId)); folly::Expected writeChain( StreamId id, Buf data, diff --git a/quic/api/test/Mocks.h b/quic/api/test/Mocks.h index 9ff06f7a1..b456f7cba 100644 --- a/quic/api/test/Mocks.h +++ b/quic/api/test/Mocks.h @@ -129,6 +129,31 @@ class MockDeliveryCallback : public QuicSocket::DeliveryCallback { MOCK_METHOD2(onCanceled, void(StreamId, uint64_t)); }; +class MockByteEventCallback : public QuicSocket::ByteEventCallback { + public: + ~MockByteEventCallback() override = default; + MOCK_METHOD1(onByteEvent, void(QuicSocket::ByteEvent)); + MOCK_METHOD1(onByteEventCanceled, void(QuicSocket::ByteEvent)); + + static auto getTxMatcher(StreamId id, uint64_t offset) { + return AllOf( + testing::Field( + &QuicSocket::ByteEvent::type, + testing::Eq(QuicSocket::ByteEvent::Type::TX)), + testing::Field(&QuicSocket::ByteEvent::id, testing::Eq(id)), + testing::Field(&QuicSocket::ByteEvent::offset, testing::Eq(offset))); + } + + static auto getAckMatcher(StreamId id, uint64_t offset) { + return AllOf( + testing::Field( + &QuicSocket::ByteEvent::type, + testing::Eq(QuicSocket::ByteEvent::Type::ACK)), + testing::Field(&QuicSocket::ByteEvent::id, testing::Eq(id)), + testing::Field(&QuicSocket::ByteEvent::offset, testing::Eq(offset))); + } +}; + class MockDataExpiredCallback : public QuicSocket::DataExpiredCallback { public: ~MockDataExpiredCallback() override = default; diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index dc846a285..d6b49860e 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -28,6 +28,7 @@ namespace quic { namespace test { constexpr uint8_t kStreamIncrement = 0x04; +using ByteEvent = QuicTransportBase::ByteEvent; enum class TestFrameType : uint8_t { STREAM, @@ -365,6 +366,22 @@ class TestQuicTransport } deliveryCallbacks_.erase(deliveryCb); } + + auto txCallbacksForStream = txCallbacks_.find(id); + if (txCallbacksForStream != txCallbacks_.end()) { + for (auto& cbs : txCallbacksForStream->second) { + ByteEvent event = {}; + event.id = id; + event.offset = cbs.first; + event.type = ByteEvent::Type::TX; + cbs.second->onByteEvent(event); + if (closeState_ != CloseState::OPEN) { + break; + } + } + txCallbacks_.erase(txCallbacksForStream); + } + SocketAddress addr("127.0.0.1", 1000); // some fake data to trigger close behavior. auto buf = encodeStreamBuffer( @@ -436,6 +453,10 @@ class QuicTransportImplTest : public Test { kDefaultMaxStreamsUnidirectional); } + auto getTxMatcher(StreamId id, uint64_t offset) { + return MockByteEventCallback::getTxMatcher(id, offset); + } + protected: std::unique_ptr evb; NiceMock connCallback; @@ -1365,91 +1386,621 @@ TEST_F(QuicTransportImplTest, DeliveryCallbackUnsetOne) { transport->close(folly::none); } -TEST_F(QuicTransportImplTest, DeliveryCallbackOnSendDataExpire) { +TEST_F(QuicTransportImplTest, TxDeliveryCallbackOnSendDataExpire) { transport->transportConn->partialReliabilityEnabled = true; auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; NiceMock dcb1; NiceMock dcb2; + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream2, 10, &txcb2); transport->registerDeliveryCallback(stream1, 10, &dcb1); - transport->registerDeliveryCallback(stream2, 20, &dcb2); + transport->registerDeliveryCallback(stream2, 10, &dcb2); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); EXPECT_CALL(dcb1, onCanceled(_, _)); EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); auto res = transport->sendDataExpired(stream1, 11); EXPECT_EQ(res.hasError(), false); - + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); Mock::VerifyAndClearExpectations(&dcb1); Mock::VerifyAndClearExpectations(&dcb2); + EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10))); EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0); EXPECT_CALL(dcb2, onCanceled(_, _)); transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); } -TEST_F(QuicTransportImplTest, DeliveryCallbackOnSendDataExpireCallbacksLeft) { +TEST_F(QuicTransportImplTest, TxDeliveryCallbackOnSendDataExpireCallbacksLeft) { transport->transportConn->partialReliabilityEnabled = true; auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; NiceMock dcb1; NiceMock dcb2; + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream1, 20, &txcb1); + transport->registerTxCallback(stream2, 10, &txcb2); + transport->registerTxCallback(stream2, 20, &txcb2); transport->registerDeliveryCallback(stream1, 10, &dcb1); transport->registerDeliveryCallback(stream1, 20, &dcb1); + transport->registerDeliveryCallback(stream2, 10, &dcb2); transport->registerDeliveryCallback(stream2, 20, &dcb2); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); EXPECT_CALL(dcb1, onCanceled(_, _)); EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); auto res = transport->sendDataExpired(stream1, 11); EXPECT_EQ(res.hasError(), false); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); Mock::VerifyAndClearExpectations(&dcb1); Mock::VerifyAndClearExpectations(&dcb2); - EXPECT_CALL(dcb2, onCanceled(_, _)); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 20))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))); EXPECT_CALL(dcb1, onCanceled(_, _)).Times(1); + EXPECT_CALL(dcb2, onCanceled(_, _)).Times(2); transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); } -TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackLowerThanExpected) { +TEST_F(QuicTransportImplTest, RegisterTxDeliveryCallbackLowerThanExpected) { auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + StrictMock txcb3; NiceMock dcb1; NiceMock dcb2; NiceMock dcb3; + transport->registerTxCallback(stream, 10, &txcb1); + transport->registerTxCallback(stream, 20, &txcb2); transport->registerDeliveryCallback(stream, 10, &dcb1); transport->registerDeliveryCallback(stream, 20, &dcb2); auto streamState = transport->transportConn->streamManager->getStream(stream); streamState->currentWriteOffset = 7; streamState->ackedIntervals.insert(0, 6); - EXPECT_CALL(dcb3, onDeliveryAck(_, _, _)) - .WillOnce(Invoke( - [](auto /* id */, auto offset, auto) { EXPECT_EQ(offset, 2); })); + EXPECT_CALL(txcb3, onByteEvent(getTxMatcher(stream, 2))); + EXPECT_CALL(dcb3, onDeliveryAck(stream, 2, _)); + transport->registerTxCallback(stream, 2, &txcb3); transport->registerDeliveryCallback(stream, 2, &dcb3); evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb3); + Mock::VerifyAndClearExpectations(&dcb3); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 20))); EXPECT_CALL(dcb1, onCanceled(_, _)); EXPECT_CALL(dcb2, onCanceled(_, _)); transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&txcb3); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + Mock::VerifyAndClearExpectations(&dcb3); } -TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackLowerThanExpectedClose) { +TEST_F( + QuicTransportImplTest, + RegisterTxDeliveryCallbackLowerThanExpectedClose) { auto stream = transport->createBidirectionalStream().value(); + StrictMock txcb; NiceMock dcb; auto streamState = transport->transportConn->streamManager->getStream(stream); streamState->currentWriteOffset = 7; + EXPECT_CALL(txcb, onByteEventCanceled(getTxMatcher(stream, 2))); EXPECT_CALL(dcb, onCanceled(_, _)); + transport->registerTxCallback(stream, 2, &txcb); transport->registerDeliveryCallback(stream, 2, &dcb); transport->close(folly::none); evb->loopOnce(); + Mock::VerifyAndClearExpectations(&txcb); + Mock::VerifyAndClearExpectations(&dcb); +} + +TEST_F(QuicTransportImplTest, CancelAllByteEventCallbacks) { + auto stream1 = transport->createBidirectionalStream().value(); + auto stream2 = transport->createBidirectionalStream().value(); + + NiceMock txcb1; + NiceMock txcb2; + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream2, 20, &txcb2); + + NiceMock dcb1; + NiceMock dcb2; + transport->registerDeliveryCallback(stream1, 10, &dcb1); + transport->registerDeliveryCallback(stream2, 20, &dcb2); + + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))); + EXPECT_CALL(dcb1, onCanceled(_, _)); + EXPECT_CALL(dcb2, onCanceled(_, _)); + + transport->cancelAllByteEventCallbacks(); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0); + EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); + + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); +} + +TEST_F(QuicTransportImplTest, CancelByteEventCallbacksForStream) { + auto stream1 = transport->createBidirectionalStream().value(); + auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + NiceMock dcb1; + NiceMock dcb2; + + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream2, 20, &txcb2); + transport->registerDeliveryCallback(stream1, 10, &dcb1); + transport->registerDeliveryCallback(stream2, 20, &dcb2); + + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(dcb1, onCanceled(stream1, 10)); + EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); + + transport->cancelByteEventCallbacksForStream(stream1); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 1, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))); + EXPECT_CALL(dcb1, onCanceled(stream1, _)).Times(0); + EXPECT_CALL(dcb2, onCanceled(_, 20)); + + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); +} + +TEST_F(QuicTransportImplTest, CancelByteEventCallbacksForStreamWithOffset) { + auto stream1 = transport->createBidirectionalStream().value(); + auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + NiceMock dcb1; + NiceMock dcb2; + + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream1, 15, &txcb1); + transport->registerTxCallback(stream1, 20, &txcb1); + transport->registerTxCallback(stream2, 10, &txcb2); + transport->registerTxCallback(stream2, 15, &txcb2); + transport->registerTxCallback(stream2, 20, &txcb2); + + EXPECT_EQ(3, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(3, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + transport->registerDeliveryCallback(stream1, 10, &dcb1); + transport->registerDeliveryCallback(stream1, 15, &dcb1); + transport->registerDeliveryCallback(stream1, 20, &dcb1); + transport->registerDeliveryCallback(stream2, 10, &dcb2); + transport->registerDeliveryCallback(stream2, 15, &dcb2); + transport->registerDeliveryCallback(stream2, 20, &dcb2); + + EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(dcb1, onCanceled(stream1, 10)); + + // cancels if offset is < (not <=) offset provided + transport->cancelByteEventCallbacksForStream(stream1, 15); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15))); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 20))); + EXPECT_CALL(dcb1, onCanceled(stream1, 15)); + EXPECT_CALL(dcb1, onCanceled(stream1, 20)); + + // cancels if offset is < (not <=) offset provided + transport->cancelByteEventCallbacksForStream(stream1, 21); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 3, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))); + EXPECT_CALL(dcb2, onCanceled(stream2, 10)); + EXPECT_CALL(dcb2, onCanceled(stream2, 15)); + EXPECT_CALL(dcb2, onCanceled(stream2, 20)); + + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); +} + +TEST_F(QuicTransportImplTest, CancelByteEventCallbacksTx) { + auto stream1 = transport->createBidirectionalStream().value(); + auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + NiceMock dcb1; + NiceMock dcb2; + + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream1, 15, &txcb1); + transport->registerTxCallback(stream2, 10, &txcb2); + transport->registerTxCallback(stream2, 15, &txcb2); + transport->registerDeliveryCallback(stream1, 10, &dcb1); + transport->registerDeliveryCallback(stream1, 15, &dcb1); + transport->registerDeliveryCallback(stream2, 10, &dcb2); + transport->registerDeliveryCallback(stream2, 15, &dcb2); + + EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15))); + + transport->cancelByteEventCallbacks(ByteEvent::Type::TX); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(dcb1, onCanceled(stream1, 10)); + EXPECT_CALL(dcb1, onCanceled(stream1, 15)); + EXPECT_CALL(dcb2, onCanceled(stream2, 10)); + EXPECT_CALL(dcb2, onCanceled(stream2, 15)); + + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); +} + +TEST_F(QuicTransportImplTest, CancelByteEventCallbacksDelivery) { + auto stream1 = transport->createBidirectionalStream().value(); + auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; + NiceMock dcb1; + NiceMock dcb2; + + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream1, 15, &txcb1); + transport->registerTxCallback(stream2, 10, &txcb2); + transport->registerTxCallback(stream2, 15, &txcb2); + transport->registerDeliveryCallback(stream1, 10, &dcb1); + transport->registerDeliveryCallback(stream1, 15, &dcb1); + transport->registerDeliveryCallback(stream2, 10, &dcb2); + transport->registerDeliveryCallback(stream2, 15, &dcb2); + + EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(dcb1, onCanceled(stream1, 10)); + EXPECT_CALL(dcb1, onCanceled(stream1, 15)); + EXPECT_CALL(dcb2, onCanceled(stream2, 10)); + EXPECT_CALL(dcb2, onCanceled(stream2, 15)); + + transport->cancelByteEventCallbacks(ByteEvent::Type::ACK); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); + + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1)); + EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream1)); + EXPECT_EQ( + 2, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::TX, stream2)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream1)); + EXPECT_EQ( + 0, + transport->getNumByteEventCallbacksForStream( + ByteEvent::Type::ACK, stream2)); + + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10))); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15))); + + transport->close(folly::none); + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); + Mock::VerifyAndClearExpectations(&dcb1); + Mock::VerifyAndClearExpectations(&dcb2); } TEST_F(QuicTransportImplTest, TestNotifyPendingConnWriteOnCloseWithoutError) { @@ -1519,6 +2070,7 @@ TEST_P(QuicTransportImplTestClose, TestNotifyPendingWriteOnCloseWithError) { } evb->loopOnce(); } + TEST_F(QuicTransportImplTest, TestTransportCloseWithMaxPacketNumber) { transport->setServerConnectionId(); transport->transportConn->pendingEvents.closeTransport = false; @@ -1527,6 +2079,7 @@ TEST_F(QuicTransportImplTest, TestTransportCloseWithMaxPacketNumber) { transport->transportConn->pendingEvents.closeTransport = true; EXPECT_THROW(transport->invokeWriteSocketData(), QuicTransportException); } + TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) { EXPECT_CALL(connCallback, onConnectionEnd()).Times(0); EXPECT_CALL(connCallback, onConnectionError(_)).Times(0); @@ -1535,13 +2088,16 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) { NiceMock wcb; NiceMock wcbConn; NiceMock rcb; - NiceMock deliveryCb; + StrictMock txCb; + StrictMock deliveryCb; EXPECT_CALL( wcb, onStreamWriteError(stream, IsError(LocalErrorCode::NO_ERROR))); EXPECT_CALL( wcbConn, onConnectionWriteError(IsError(LocalErrorCode::NO_ERROR))); EXPECT_CALL(rcb, readError(stream, IsError(LocalErrorCode::NO_ERROR))); EXPECT_CALL(deliveryCb, onCanceled(stream, _)); + EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 0))); + EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 4))); transport->notifyPendingWriteOnConnection(&wcbConn); transport->notifyPendingWriteOnStream(stream, &wcb); @@ -1550,6 +2106,8 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) { .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1)); transport->writeChain( stream, IOBuf::copyBuffer("hello"), true, false, &deliveryCb); + EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError()); + EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError()); transport->closeGracefully(); ASSERT_FALSE(transport->transportClosed); @@ -1558,6 +2116,7 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) { EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError()); EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError()); EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError()); + EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError()); EXPECT_TRUE( transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError()); EXPECT_TRUE( @@ -1584,9 +2143,12 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) { NiceMock wcbConn; NiceMock rcb; NiceMock deliveryCb; + NiceMock txCb; EXPECT_CALL( rcb, readError(stream, IsError(GenericApplicationErrorCode::NO_ERROR))); EXPECT_CALL(deliveryCb, onDeliveryAck(stream, _, _)); + EXPECT_CALL(txCb, onByteEvent(getTxMatcher(stream, 0))); + EXPECT_CALL(txCb, onByteEvent(getTxMatcher(stream, 4))); EXPECT_CALL(connCallback, onConnectionEnd()).Times(0); EXPECT_CALL(connCallback, onConnectionError(_)).Times(0); @@ -1596,11 +2158,14 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) { .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1)); transport->writeChain( stream, IOBuf::copyBuffer("hello"), true, false, &deliveryCb); + EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError()); + EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError()); // Close the last stream. auto streamState = transport->transportConn->streamManager->getStream(stream); - // Fake that the data was delivered to keep all the state consistent. + // Fake that the data was TXed and delivered to keep all the state consistent. streamState->currentWriteOffset = 7; + transport->transportConn->streamManager->addTx(stream); transport->transportConn->streamManager->addDeliverable(stream); transport->closeStream(stream); transport->close(folly::none); @@ -1611,6 +2176,7 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) { EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError()); EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError()); EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError()); + EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError()); EXPECT_TRUE( transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError()); EXPECT_TRUE( @@ -1624,6 +2190,7 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) { NiceMock wcbConn; NiceMock rcb; NiceMock deliveryCb; + NiceMock txCb; EXPECT_CALL( wcb, onStreamWriteError( @@ -1634,6 +2201,8 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) { EXPECT_CALL( rcb, readError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN))); EXPECT_CALL(deliveryCb, onCanceled(stream, _)); + EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 0))); + EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 4))); EXPECT_CALL(connCallback, onConnectionError(_)).Times(0); @@ -1644,6 +2213,8 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) { .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1)); transport->writeChain( stream, IOBuf::copyBuffer("hello"), true, false, &deliveryCb); + EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError()); + EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError()); transport->close(std::make_pair( QuicErrorCode(GenericApplicationErrorCode::UNKNOWN), std::string("Error"))); @@ -1654,6 +2225,7 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) { EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError()); EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError()); EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError()); + EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError()); EXPECT_TRUE( transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError()); EXPECT_TRUE( @@ -1863,6 +2435,20 @@ TEST_F(QuicTransportImplTest, UnidirectionalInvalidWriteFuncs) { transport->registerDeliveryCallback(stream, 0, nullptr) .thenOrThrow([&](auto) {}), folly::Unexpected::BadExpectedAccess); + EXPECT_THROW( + transport->registerTxCallback(stream, 0, nullptr).thenOrThrow([&](auto) { + }), + folly::Unexpected::BadExpectedAccess); + EXPECT_THROW( + transport + ->registerByteEventCallback(ByteEvent::Type::ACK, stream, 0, nullptr) + .thenOrThrow([&](auto) {}), + folly::Unexpected::BadExpectedAccess); + EXPECT_THROW( + transport + ->registerByteEventCallback(ByteEvent::Type::TX, stream, 0, nullptr) + .thenOrThrow([&](auto) {}), + folly::Unexpected::BadExpectedAccess); EXPECT_THROW( transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN) .thenOrThrow([&](auto) {}), @@ -2524,43 +3110,58 @@ TEST_F(QuicTransportImplTest, DataRejecteddCallbackDataAvailable) { transport.reset(); } -TEST_F(QuicTransportImplTest, DataRejecteddCallbackWithDeliveryCallbacks) { +TEST_F(QuicTransportImplTest, DataRejecteddCallbackWithTxAndDeliveryCallbacks) { transport->transportConn->partialReliabilityEnabled = true; auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; NiceMock dcb1; NiceMock dcb2; NiceMock dataRejectedCb1; NiceMock dataRejectedCb2; + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream2, 20, &txcb2); + transport->registerDeliveryCallback(stream1, 10, &dcb1); transport->registerDeliveryCallback(stream2, 20, &dcb2); transport->setDataRejectedCallback(stream1, &dataRejectedCb1); transport->setDataRejectedCallback(stream2, &dataRejectedCb2); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))).Times(1); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); EXPECT_CALL(dcb1, onCanceled(stream1, 10)).Times(1); EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 15)); + transport->addMinStreamDataFrameToStream( MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 15)); - + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); Mock::VerifyAndClearExpectations(&dcb1); Mock::VerifyAndClearExpectations(&dcb2); Mock::VerifyAndClearExpectations(&dataRejectedCb1); + EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))).Times(1); EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0); EXPECT_CALL(dcb2, onCanceled(stream2, 20)).Times(1); EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 23)); + transport->addMinStreamDataFrameToStream( MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 23)); - + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); Mock::VerifyAndClearExpectations(&dcb1); Mock::VerifyAndClearExpectations(&dcb2); Mock::VerifyAndClearExpectations(&dataRejectedCb2); + EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0); EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); transport->close(folly::none); @@ -2568,17 +3169,24 @@ TEST_F(QuicTransportImplTest, DataRejecteddCallbackWithDeliveryCallbacks) { TEST_F( QuicTransportImplTest, - DataRejecteddCallbackWithDeliveryCallbacksSomeLeft) { + DataRejecteddCallbackWithTxAndDeliveryCallbacksSomeLeft) { transport->transportConn->partialReliabilityEnabled = true; auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); + StrictMock txcb1; + StrictMock txcb2; NiceMock dcb1; NiceMock dcb2; NiceMock dataRejectedCb1; NiceMock dataRejectedCb2; + transport->registerTxCallback(stream1, 10, &txcb1); + transport->registerTxCallback(stream1, 25, &txcb1); + transport->registerTxCallback(stream2, 20, &txcb2); + transport->registerTxCallback(stream2, 29, &txcb2); + transport->registerDeliveryCallback(stream1, 10, &dcb1); transport->registerDeliveryCallback(stream1, 25, &dcb1); transport->registerDeliveryCallback(stream2, 20, &dcb2); @@ -2587,26 +3195,36 @@ TEST_F( transport->setDataRejectedCallback(stream1, &dataRejectedCb1); transport->setDataRejectedCallback(stream2, &dataRejectedCb2); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))).Times(1); + EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0); EXPECT_CALL(dcb1, onCanceled(stream1, 10)).Times(1); EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0); EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 15)); + transport->addMinStreamDataFrameToStream( MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 15)); - + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); Mock::VerifyAndClearExpectations(&dcb1); Mock::VerifyAndClearExpectations(&dcb2); Mock::VerifyAndClearExpectations(&dataRejectedCb1); + EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))).Times(1); EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0); EXPECT_CALL(dcb2, onCanceled(stream2, 20)).Times(1); EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 23)); + transport->addMinStreamDataFrameToStream( MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 23)); - + Mock::VerifyAndClearExpectations(&txcb1); + Mock::VerifyAndClearExpectations(&txcb2); Mock::VerifyAndClearExpectations(&dcb1); Mock::VerifyAndClearExpectations(&dcb2); Mock::VerifyAndClearExpectations(&dataRejectedCb2); + EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 25))).Times(1); + EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 29))).Times(1); EXPECT_CALL(dcb1, onCanceled(stream1, 25)).Times(1); EXPECT_CALL(dcb2, onCanceled(stream2, 29)).Times(1); transport->close(folly::none); diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 88a252cf7..92e3dbc2f 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -106,6 +106,10 @@ class QuicTransportTest : public Test { evb_.loopOnce(EVLOOP_NONBLOCK); } + auto getTxMatcher(StreamId id, uint64_t offset) { + return MockByteEventCallback::getTxMatcher(id, offset); + } + protected: folly::EventBase evb_; MockAsyncUDPSocket* socket_; @@ -1966,6 +1970,414 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) { transport_->onNetworkData(addr, std::move(emptyData2)); } +TEST_F(QuicTransportTest, InvokeDeliveryCallbacksSingleByte) { + // register all possible ways to get a DeliveryCb + // + // applications built atop QUIC may capture both first and last byte timings, + // which in this test are the same byte + StrictMock writeChainDeliveryCb; + StrictMock firstByteDeliveryCb; + StrictMock lastByteDeliveryCb; + StrictMock unsentByteDeliveryCb; + auto stream = transport_->createBidirectionalStream().value(); + + auto buf = buildRandomInputData(1); + transport_->writeChain( + stream, + buf->clone(), + false /* eof */, + false /* cork */, + &writeChainDeliveryCb); + transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 1, &unsentByteDeliveryCb); + + // writeChain, first, last byte callbacks triggered after delivery + auto& conn = transport_->getConnectionState(); + folly::SocketAddress addr; + conn.streamManager->addDeliverable(stream); + conn.lossState.srtt = 100us; + NetworkData networkData; + auto streamState = conn.streamManager->getStream(stream); + streamState->ackedIntervals.insert(0, 0); + EXPECT_CALL(writeChainDeliveryCb, onDeliveryAck(stream, 0, 100us)).Times(1); + EXPECT_CALL(firstByteDeliveryCb, onDeliveryAck(stream, 0, 100us)).Times(1); + EXPECT_CALL(lastByteDeliveryCb, onDeliveryAck(stream, 0, 100us)).Times(1); + transport_->onNetworkData(addr, std::move(networkData)); + Mock::VerifyAndClearExpectations(&writeChainDeliveryCb); + Mock::VerifyAndClearExpectations(&firstByteDeliveryCb); + Mock::VerifyAndClearExpectations(&lastByteDeliveryCb); + + // try to set both offsets again + // callbacks should be triggered immediately + EXPECT_CALL(firstByteDeliveryCb, onDeliveryAck(stream, 0, _)).Times(1); + EXPECT_CALL(lastByteDeliveryCb, onDeliveryAck(stream, 0, _)).Times(1); + transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb); + loopForWrites(); + Mock::VerifyAndClearExpectations(&firstByteDeliveryCb); + Mock::VerifyAndClearExpectations(&lastByteDeliveryCb); + + // unsentByteDeliveryCb::onByteEvent will never get called + // cancel gets called instead + EXPECT_CALL(unsentByteDeliveryCb, onCanceled(stream, 1)).Times(1); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(&unsentByteDeliveryCb); +} + +TEST_F(QuicTransportTest, InvokeDeliveryCallbacksSingleByteWithFin) { + // register all possible ways to get a DeliveryCb + // + // applications built atop QUIC may capture both first and last byte timings, + // which in this test are the same byte + StrictMock writeChainDeliveryCb; + StrictMock firstByteDeliveryCb; + StrictMock lastByteDeliveryCb; + StrictMock finDeliveryCb; + StrictMock unsentByteDeliveryCb; + auto stream = transport_->createBidirectionalStream().value(); + + auto buf = buildRandomInputData(1); + transport_->writeChain( + stream, + buf->clone(), + true /* eof */, + false /* cork */, + &writeChainDeliveryCb); + transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 1, &finDeliveryCb); + transport_->registerDeliveryCallback(stream, 2, &unsentByteDeliveryCb); + + // writeChain, first, last byte, fin callbacks triggered after delivery + auto& conn = transport_->getConnectionState(); + folly::SocketAddress addr; + conn.streamManager->addDeliverable(stream); + conn.lossState.srtt = 100us; + NetworkData networkData; + auto streamState = conn.streamManager->getStream(stream); + streamState->ackedIntervals.insert(0, 1); + EXPECT_CALL(writeChainDeliveryCb, onDeliveryAck(stream, 1, 100us)).Times(1); + EXPECT_CALL(firstByteDeliveryCb, onDeliveryAck(stream, 0, 100us)).Times(1); + EXPECT_CALL(lastByteDeliveryCb, onDeliveryAck(stream, 0, 100us)).Times(1); + EXPECT_CALL(finDeliveryCb, onDeliveryAck(stream, 1, 100us)).Times(1); + transport_->onNetworkData(addr, std::move(networkData)); + Mock::VerifyAndClearExpectations(&writeChainDeliveryCb); + Mock::VerifyAndClearExpectations(&firstByteDeliveryCb); + Mock::VerifyAndClearExpectations(&lastByteDeliveryCb); + + // try to set all three offsets again + // callbacks should be triggered immediately + EXPECT_CALL(firstByteDeliveryCb, onDeliveryAck(stream, 0, _)).Times(1); + EXPECT_CALL(lastByteDeliveryCb, onDeliveryAck(stream, 0, _)).Times(1); + EXPECT_CALL(finDeliveryCb, onDeliveryAck(stream, 1, _)).Times(1); + transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb); + transport_->registerDeliveryCallback(stream, 1, &finDeliveryCb); + loopForWrites(); + Mock::VerifyAndClearExpectations(&firstByteDeliveryCb); + Mock::VerifyAndClearExpectations(&lastByteDeliveryCb); + Mock::VerifyAndClearExpectations(&finDeliveryCb); + + // unsentByteDeliveryCb::onByteEvent will never get called + // cancel gets called instead + EXPECT_CALL(unsentByteDeliveryCb, onCanceled(stream, 2)).Times(1); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(&unsentByteDeliveryCb); +} + +TEST_F(QuicTransportTest, InvokeTxCallbacksSingleByte) { + StrictMock firstByteTxCb; + StrictMock lastByteTxCb; + StrictMock pastlastByteTxCb; + auto stream = transport_->createBidirectionalStream().value(); + + auto buf = buildRandomInputData(1); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback(stream, 0, &lastByteTxCb); + transport_->registerTxCallback(stream, 1, &pastlastByteTxCb); + + // first and last byte TX callbacks should be triggered immediately + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&firstByteTxCb); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + + // try to set the first and last byte offsets again + // callbacks should be triggered immediately + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback(stream, 0, &lastByteTxCb); + loopForWrites(); // have to loop since processed async + Mock::VerifyAndClearExpectations(&firstByteTxCb); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + + // even if we register pastlastByte again, it shouldn't be triggered + transport_->registerTxCallback(stream, 1, &pastlastByteTxCb); + + // pastlastByteTxCb::onByteEvent will never get called + // cancel gets called instead + // onByteEventCanceled called twice, since added twice + EXPECT_CALL(pastlastByteTxCb, onByteEventCanceled(getTxMatcher(stream, 1))) + .Times(2); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(&pastlastByteTxCb); +} + +TEST_F(QuicTransportTest, InvokeTxCallbacksSingleByteWithFin) { + StrictMock firstByteTxCb; + StrictMock lastByteTxCb; + StrictMock finTxCb; + StrictMock pastlastByteTxCb; + auto stream = transport_->createBidirectionalStream().value(); + + auto buf = buildRandomInputData(1); + transport_->writeChain( + stream, buf->clone(), true /* eof */, false /* cork */); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback(stream, 0, &lastByteTxCb); + transport_->registerTxCallback(stream, 1, &finTxCb); + transport_->registerTxCallback(stream, 2, &pastlastByteTxCb); + + // first, last byte, and fin TX callbacks should be triggered immediately + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(finTxCb, onByteEvent(getTxMatcher(stream, 1))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&firstByteTxCb); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + Mock::VerifyAndClearExpectations(&finTxCb); + + // try to set all three offsets again + // callbacks should be triggered immediately + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(finTxCb, onByteEvent(getTxMatcher(stream, 1))).Times(1); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback(stream, 0, &lastByteTxCb); + transport_->registerTxCallback(stream, 1, &finTxCb); + loopForWrites(); // have to loop since processed async + Mock::VerifyAndClearExpectations(&firstByteTxCb); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + Mock::VerifyAndClearExpectations(&finTxCb); + + // pastlastByteTxCb::onByteEvent will never get called + // cancel gets called instead + EXPECT_CALL(pastlastByteTxCb, onByteEventCanceled(getTxMatcher(stream, 2))) + .Times(1); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(&pastlastByteTxCb); +} + +TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytes) { + const uint64_t streamBytes = 10; + const uint64_t lastByte = streamBytes - 1; + + StrictMock firstByteTxCb; + StrictMock lastByteTxCb; + StrictMock pastlastByteTxCb; + auto stream = transport_->createBidirectionalStream().value(); + + auto buf = buildRandomInputData(streamBytes); + CHECK_EQ(streamBytes, buf->length()); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback(stream, lastByte, &lastByteTxCb); + transport_->registerTxCallback(stream, lastByte + 1, &pastlastByteTxCb); + + // first and last byte TX callbacks should be triggered immediately + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, lastByte))) + .Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&firstByteTxCb); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + + // try to set the first and last byte offsets again + // callbacks should be triggered immediately + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, lastByte))) + .Times(1); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback(stream, lastByte, &lastByteTxCb); + loopForWrites(); // have to loop since processed async + Mock::VerifyAndClearExpectations(&firstByteTxCb); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + + // pastlastByteTxCb::onByteEvent will never get called + // cancel gets called instead + EXPECT_CALL( + pastlastByteTxCb, onByteEventCanceled(getTxMatcher(stream, lastByte + 1))) + .Times(1); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(&pastlastByteTxCb); +} + +TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytesWriteRateLimited) { + // configure connection to write one packet each round + auto& conn = transport_->getConnectionState(); + conn.transportSettings.writeConnectionDataPacketsLimit = 1; + + StrictMock firstByteTxCb; + StrictMock secondPacketByteOffsetTxCb; + StrictMock lastByteTxCb; + StrictMock pastlastByteTxCb; + auto stream = transport_->createBidirectionalStream().value(); + + const uint64_t streamBytes = kDefaultUDPSendPacketLen * 4; + const uint64_t lastByte = streamBytes - 1; + auto buf = buildRandomInputData(streamBytes); + CHECK_EQ(streamBytes, buf->length()); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */); + transport_->registerTxCallback(stream, 0, &firstByteTxCb); + transport_->registerTxCallback( + stream, kDefaultUDPSendPacketLen * 2, &secondPacketByteOffsetTxCb); + transport_->registerTxCallback(stream, lastByte, &lastByteTxCb); + transport_->registerTxCallback(stream, lastByte + 1, &pastlastByteTxCb); + + // first byte gets TXed on first call to loopForWrites + EXPECT_CALL(firstByteTxCb, onByteEvent(getTxMatcher(stream, 0))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&firstByteTxCb); + + // second packet byte offset gets TXed on second call to loopForWrites + EXPECT_CALL( + secondPacketByteOffsetTxCb, + onByteEvent(getTxMatcher(stream, kDefaultUDPSendPacketLen * 2))) + .Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + + // nothing happens on third or fourth call to loopForWrites + loopForWrites(); + loopForWrites(); + + // due to overhead, last byte gets TXed on fifth call to loopForWrites + EXPECT_CALL(lastByteTxCb, onByteEvent(getTxMatcher(stream, lastByte))) + .Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&lastByteTxCb); + + // pastlastByteTxCb::onByteEvent will never get called + // cancel gets called instead + EXPECT_CALL( + pastlastByteTxCb, onByteEventCanceled(getTxMatcher(stream, lastByte + 1))) + .Times(1); + transport_->close(folly::none); + Mock::VerifyAndClearExpectations(&pastlastByteTxCb); +} + +TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytesMultipleWrites) { + // configure connection to write one packet each round + auto& conn = transport_->getConnectionState(); + conn.transportSettings.writeConnectionDataPacketsLimit = 1; + + StrictMock txCb1; + StrictMock txCb2; + StrictMock txCb3; + auto stream = transport_->createBidirectionalStream().value(); + + // call writeChain, writing 10 bytes + { + auto buf = buildRandomInputData(10); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */); + } + transport_->registerTxCallback(stream, 0, &txCb1); + EXPECT_CALL(txCb1, onByteEvent(getTxMatcher(stream, 0))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&txCb1); + + // call writeChain and write another 10 bytes + { + auto buf = buildRandomInputData(10); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */); + } + transport_->registerTxCallback(stream, 10, &txCb2); + EXPECT_CALL(txCb2, onByteEvent(getTxMatcher(stream, 10))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&txCb2); + + // write the fin + { + auto buf = buildRandomInputData(0); + transport_->writeChain( + stream, buf->clone(), true /* eof */, false /* cork */); + } + transport_->registerTxCallback(stream, 20, &txCb3); + EXPECT_CALL(txCb3, onByteEvent(getTxMatcher(stream, 20))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&txCb3); +} + +TEST_F( + QuicTransportTest, + InvokeTxAndDeliveryCallbacksMultipleBytesMultipleWrites) { + // configure connection to write one packet each round + auto& conn = transport_->getConnectionState(); + conn.transportSettings.writeConnectionDataPacketsLimit = 1; + + StrictMock txCb1; + StrictMock txCb2; + StrictMock txCb3; + + StrictMock deliveryCb1; + StrictMock deliveryCb2; + StrictMock deliveryCb3; + auto stream = transport_->createBidirectionalStream().value(); + + // call writeChain, writing 10 bytes + { + auto buf = buildRandomInputData(10); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */, &deliveryCb1); + } + transport_->registerTxCallback(stream, 0, &txCb1); + EXPECT_CALL(txCb1, onByteEvent(getTxMatcher(stream, 0))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&txCb1); + + // call writeChain and write another 10 bytes + { + auto buf = buildRandomInputData(10); + transport_->writeChain( + stream, buf->clone(), false /* eof */, false /* cork */, &deliveryCb2); + } + transport_->registerTxCallback(stream, 10, &txCb2); + EXPECT_CALL(txCb2, onByteEvent(getTxMatcher(stream, 10))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&txCb2); + + // write the fin + { + auto buf = buildRandomInputData(0); + transport_->writeChain( + stream, buf->clone(), true /* eof */, false /* cork */, &deliveryCb3); + } + transport_->registerTxCallback(stream, 20, &txCb3); + EXPECT_CALL(txCb3, onByteEvent(getTxMatcher(stream, 20))).Times(1); + loopForWrites(); + Mock::VerifyAndClearExpectations(&txCb3); + + folly::SocketAddress addr; + conn.streamManager->addDeliverable(stream); + conn.lossState.srtt = 100us; + NetworkData networkData; + auto streamState = conn.streamManager->getStream(stream); + streamState->ackedIntervals.insert(0, 20); + EXPECT_CALL(deliveryCb1, onDeliveryAck(stream, 9, 100us)).Times(1); + EXPECT_CALL(deliveryCb2, onDeliveryAck(stream, 19, 100us)).Times(1); + EXPECT_CALL(deliveryCb3, onDeliveryAck(stream, 20, 100us)).Times(1); + transport_->onNetworkData(addr, std::move(networkData)); +} + TEST_F(QuicTransportTest, NotifyPendingWriteConnImmediate) { EXPECT_CALL(writeCallback_, onConnectionWriteReady(_)); transport_->notifyPendingWriteOnConnection(&writeCallback_); diff --git a/quic/server/test/QuicServerTransportTest.cpp b/quic/server/test/QuicServerTransportTest.cpp index b81a6fc76..48210fd69 100644 --- a/quic/server/test/QuicServerTransportTest.cpp +++ b/quic/server/test/QuicServerTransportTest.cpp @@ -34,6 +34,7 @@ namespace quic { namespace test { namespace { +using ByteEvent = QuicTransportBase::ByteEvent; using PacketDropReason = QuicTransportStatsCallback::PacketDropReason; } // namespace @@ -2054,6 +2055,32 @@ TEST_F(QuicServerTransportTest, DestroyWithoutClosing) { server.reset(); } +TEST_F(QuicServerTransportTest, DestroyWithoutClosingCancelByteEvents) { + StreamId streamId = server->createBidirectionalStream().value(); + + MockReadCallback readCb; + server->setReadCallback(streamId, &readCb); + + EXPECT_CALL(connCallback, onConnectionError(_)).Times(0); + EXPECT_CALL(connCallback, onConnectionEnd()).Times(0); + auto write = IOBuf::copyBuffer("no"); + server->writeChain(streamId, write->clone(), true, false); + + MockByteEventCallback txCallback; + MockByteEventCallback deliveryCallback; + + server->registerByteEventCallback( + ByteEvent::Type::TX, streamId, 0, &txCallback); + server->registerByteEventCallback( + ByteEvent::Type::ACK, streamId, 0, &deliveryCallback); + + EXPECT_CALL(txCallback, onByteEventCanceled(_)); + EXPECT_CALL(deliveryCallback, onByteEventCanceled(_)); + EXPECT_CALL(readCb, readError(_, _)); + + server.reset(); +} + TEST_F(QuicServerTransportTest, SetCongestionControl) { // Default: Cubic auto cc = server->getConn().congestionController.get(); diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index dcd9def5b..c1d735300 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -381,6 +381,16 @@ uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) { stream.currentWriteOffset + stream.writeBuffer.chainLength()); } +folly::Optional getLargestWriteOffsetTxed( + const QuicStreamState& stream) { + // currentWriteOffset is really nextWriteOffset + // when 0, it indicates nothing has been written yet + if (stream.currentWriteOffset == 0) { + return folly::none; + } + return stream.currentWriteOffset - 1; +} + folly::Optional getLargestDeliverableOffset( const QuicStreamState& stream) { // If the acked intervals is not empty, then the furthest acked interval diff --git a/quic/state/QuicStreamFunctions.h b/quic/state/QuicStreamFunctions.h index bd7e59930..1c363e740 100644 --- a/quic/state/QuicStreamFunctions.h +++ b/quic/state/QuicStreamFunctions.h @@ -92,6 +92,14 @@ void appendPendingStreamReset( */ uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream); +/** + * Get the largest write offset the stream has transmitted / written to socket. + * + * If no bytes have been written to the socket yet, returns folly::none. + */ +folly::Optional getLargestWriteOffsetTxed( + const QuicStreamState& stream); + /** * Get the the highest acked offset (if any) that we can execute delivery * callbacks on. diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index b84026c39..a4fa3c10c 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -422,6 +422,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { writableControlStreams_.erase(streamId); blockedStreams_.erase(streamId); deliverableStreams_.erase(streamId); + txStreams_.erase(streamId); windowUpdates_.erase(streamId); lossStreams_.erase(streamId); stopSendingStreams_.erase(streamId); diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 3acfb336a..08335e973 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -451,11 +451,10 @@ class QuicStreamManager { auto itr = deliverableStreams_.begin(); if (itr == deliverableStreams_.end()) { return folly::none; - } else { - StreamId ret = *itr; - deliverableStreams_.erase(itr); - return ret; } + StreamId ret = *itr; + deliverableStreams_.erase(itr); + return ret; } /* @@ -472,6 +471,55 @@ class QuicStreamManager { return deliverableStreams_.count(streamId) > 0; } + /* + * Returns a const reference to the underlying TX streams container. + */ + FOLLY_NODISCARD const auto& txStreams() const { + return txStreams_; + } + + /* + * Add a stream to list of streams that have transmitted. + */ + void addTx(StreamId streamId) { + txStreams_.insert(streamId); + } + + /* + * Remove a TX stream. + */ + void removeTx(StreamId streamId) { + txStreams_.erase(streamId); + } + + /* + * Pop a TX stream id and return it. + */ + folly::Optional popTx() { + auto itr = txStreams_.begin(); + if (itr == txStreams_.end()) { + return folly::none; + } else { + StreamId ret = *itr; + txStreams_.erase(itr); + return ret; + } + } + + /* + * Returns if there are any TX streams. + */ + FOLLY_NODISCARD bool hasTx() const { + return !txStreams_.empty(); + } + + /* + * Returns if the stream is in the TX container. + */ + FOLLY_NODISCARD bool txContains(StreamId streamId) const { + return txStreams_.count(streamId) > 0; + } + /* * Returns a const reference to the underlying data rejected streams * container. @@ -708,6 +756,7 @@ class QuicStreamManager { */ void clearActionable() { deliverableStreams_.clear(); + txStreams_.clear(); readableStreams_.clear(); peekableStreams_.clear(); flowControlUpdated_.clear(); @@ -845,6 +894,9 @@ class QuicStreamManager { // Set of control streams that have writable data std::set writableControlStreams_; + // Streams that may be able to call TxCallback + folly::F14FastSet txStreams_; + // Streams that may be able to callback DeliveryCallback folly::F14FastSet deliverableStreams_; diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index e1cf41249..36346ad6d 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -1880,6 +1880,38 @@ TEST_F(QuicStreamFunctionsTest, LargestWriteOffsetSeenNoFIN) { EXPECT_EQ(120, getLargestWriteOffsetSeen(stream)); } +TEST_F(QuicStreamFunctionsTest, StreamLargestWriteOffsetTxedNothingTxed) { + QuicStreamState stream(3, conn); + stream.currentWriteOffset = 0; + EXPECT_EQ(folly::none, getLargestWriteOffsetTxed(stream)); +} + +TEST_F(QuicStreamFunctionsTest, StreamLargestWriteOffsetTxedOneByteTxed) { + QuicStreamState stream(3, conn); + stream.currentWriteOffset = 1; + ASSERT_TRUE(getLargestWriteOffsetTxed(stream).has_value()); + EXPECT_EQ(0, getLargestWriteOffsetTxed(stream).value()); +} + +TEST_F(QuicStreamFunctionsTest, StreamLargestWriteOffsetTxedHundredBytesTxed) { + QuicStreamState stream(3, conn); + stream.currentWriteOffset = 100; + ASSERT_TRUE(getLargestWriteOffsetTxed(stream).has_value()); + EXPECT_EQ(99, getLargestWriteOffsetTxed(stream).value()); +} + +TEST_F( + QuicStreamFunctionsTest, + StreamLargestWriteOffsetTxedIgnoreFinalWriteOffset) { + // finalWriteOffset is set when writeChain is called with EoR, but we should + // always use currentWriteOffset to determine how many bytes have been TXed + QuicStreamState stream(3, conn); + stream.currentWriteOffset = 10; + stream.finalWriteOffset = 100; + ASSERT_TRUE(getLargestWriteOffsetTxed(stream).has_value()); + EXPECT_EQ(9, getLargestWriteOffsetTxed(stream).value()); +} + TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverNothingAcked) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100;