diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index b70728a59..a1255ef24 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -458,8 +458,7 @@ class QuicSocket { virtual void unsetAllPeekCallbacks() = 0; /** - * Convenience function that sets the read callbacks of all streams to be - * nullptr. + * Convenience function that cancels delivery callbacks of all streams. */ virtual void unsetAllDeliveryCallbacks() = 0; @@ -787,15 +786,57 @@ class QuicSocket { virtual folly::Expected unregisterStreamWriteCallback(StreamId) = 0; + /** + * Structure used to communicate TX and ACK/Delivery notifications. + */ + struct ByteEvent { + enum class Type { ACK = 1 }; + + StreamId id{0}; + uint64_t offset{0}; + Type type; + + // sRTT at time of event + // TODO(bschlinker): Deprecate, caller can fetch transport state if desired. + std::chrono::microseconds srtt{0us}; + }; + + /** + * Structure used to communicate cancellation of a ByteEvent. + * + * According to Dictionary.com, cancellation is more frequent in American + * English than cancelation. Yet in American English, the preferred style is + * typically not to double the final L, so cancel generally becomes canceled. + */ + using ByteEventCancellation = ByteEvent; + + /** + * Callback class for receiving byte event (TX/ACK) notifications. + */ + class ByteEventCallback { + public: + virtual ~ByteEventCallback() = default; + + /** + * Invoked when the byte event has occurred. + */ + virtual void onByteEvent(ByteEvent byteEvent) = 0; + + /** + * Invoked if byte event is canceled due to reset, shutdown, or other error. + */ + virtual void onByteEventCanceled(ByteEventCancellation cancellation) = 0; + }; + /** * Callback class for receiving ack notifications */ - class DeliveryCallback { + class DeliveryCallback : public ByteEventCallback { public: virtual ~DeliveryCallback() = default; /** - * Invoked when the peer has acknowledged the receipt of the specificed + * Invoked when the peer has acknowledged the receipt of the specified * offset. rtt is the current RTT estimate for the connection. */ virtual void onDeliveryAck( @@ -808,8 +849,63 @@ class QuicSocket { * delivered (due to a reset or other error). */ virtual void onCanceled(StreamId id, uint64_t offset) = 0; + + private: + // Temporary shim during transition to ByteEvent + void onByteEvent(ByteEvent byteEvent) final { + CHECK_EQ((int)ByteEvent::Type::ACK, (int)byteEvent.type); // sanity + onDeliveryAck(byteEvent.id, byteEvent.offset, byteEvent.srtt); + } + + // Temporary shim during transition to ByteEvent + void onByteEventCanceled(ByteEventCancellation cancellation) final { + CHECK_EQ((int)ByteEvent::Type::ACK, (int)cancellation.type); // sanity + onCanceled(cancellation.id, cancellation.offset); + } }; + /** + * Register a byte event to be triggered when specified event type occurs for + * the specified stream and offset. + */ + virtual folly::Expected + registerByteEventCallback( + const ByteEvent::Type type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) = 0; + + /** + * Cancel byte event callbacks for given stream. + * + * If an offset is provided, cancels only callbacks with an offset less than + * or equal to the provided offset, otherwise cancels all callbacks. + */ + virtual void cancelByteEventCallbacksForStream( + const StreamId id, + const folly::Optional& offset = folly::none) = 0; + + /** + * Cancel byte event callbacks for given type and stream. + * + * If an offset is provided, cancels only callbacks with an offset less than + * or equal to the provided offset, otherwise cancels all callbacks. + */ + virtual void cancelByteEventCallbacksForStream( + const ByteEvent::Type type, + const StreamId id, + const folly::Optional& offset = folly::none) = 0; + + /** + * Cancel all byte event callbacks of all streams. + */ + virtual void cancelAllByteEventCallbacks() = 0; + + /** + * Cancel all byte event callbacks of all streams of the given type. + */ + virtual void cancelByteEventCallbacks(const ByteEvent::Type type) = 0; + /** * Write data/eof to the given stream. * @@ -830,12 +926,12 @@ class QuicSocket { /** * Register a callback to be invoked when the peer has acknowledged the - * given offset on the given stream + * given offset on the given stream. */ virtual folly::Expected registerDeliveryCallback( StreamId id, uint64_t offset, - DeliveryCallback* cb) = 0; + ByteEventCallback* cb) = 0; /** * Close the stream for writing. Equivalent to writeChain(id, nullptr, true). diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index e3d4b1a2a..9ca1214c1 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1033,9 +1033,9 @@ QuicTransportBase::sendDataExpired(StreamId id, uint64_t offset) { auto stream = conn_->streamManager->getStream(id); auto newOffset = advanceMinimumRetransmittableOffset(stream, offset); - // Invoke any delivery callbacks that are set for any offset below newOffset. + // Cancel byte event callbacks that are set for any offset below newOffset. if (newOffset) { - cancelDeliveryCallbacksForStream(id, *newOffset); + cancelByteEventCallbacksForStream(id, *newOffset); } updateWriteLooper(true); @@ -1101,7 +1101,7 @@ void QuicTransportBase::invokeDataRejectedCallbacks() { // Invoke any delivery callbacks that are set for any offset below newly set // minimumRetransmittableOffset. if (!stream->streamReadError) { - cancelDeliveryCallbacksForStream( + cancelByteEventCallbacksForStream( streamId, stream->minimumRetransmittableOffset); } @@ -1221,54 +1221,58 @@ void QuicTransportBase::updateWriteLooper(bool thisIteration) { } } -void QuicTransportBase::cancelDeliveryCallbacksForStream(StreamId streamId) { - if (isReceivingStream(conn_->nodeType, streamId)) { - return; - } - conn_->streamManager->removeDeliverable(streamId); - auto deliveryCallbackIter = deliveryCallbacks_.find(streamId); - if (deliveryCallbackIter == deliveryCallbacks_.end()) { - return; - } - while (!deliveryCallbackIter->second.empty()) { - auto deliveryCallback = deliveryCallbackIter->second.front(); - deliveryCallbackIter->second.pop_front(); - deliveryCallback.second->onCanceled(streamId, deliveryCallback.first); - if (closeState_ != CloseState::OPEN) { - // socket got closed - we can't use deliveryCallbackIter anymore, - // closeImpl should take care of delivering callbacks that are left in - // deliveryCallbackIter->second - return; - } - } - deliveryCallbacks_.erase(deliveryCallbackIter); +void QuicTransportBase::cancelDeliveryCallbacksForStream(StreamId id) { + cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id); } void QuicTransportBase::cancelDeliveryCallbacksForStream( - StreamId streamId, + StreamId id, uint64_t offset) { - if (isReceivingStream(conn_->nodeType, streamId)) { + cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id, offset); +} + +void QuicTransportBase::cancelByteEventCallbacksForStream( + const StreamId id, + const folly::Optional& offset) { + cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id, offset); +} + +void QuicTransportBase::cancelByteEventCallbacksForStream( + const ByteEvent::Type type, + const StreamId id, + const folly::Optional& offset) { + if (isReceivingStream(conn_->nodeType, id)) { return; } - auto deliveryCallbackIter = deliveryCallbacks_.find(streamId); - if (deliveryCallbackIter == deliveryCallbacks_.end()) { - conn_->streamManager->removeDeliverable(streamId); + auto& byteEventMap = getByteEventMap(type); + auto byteEventMapIt = byteEventMap.find(id); + if (byteEventMapIt == byteEventMap.end()) { + switch (type) { + case ByteEvent::Type::ACK: + conn_->streamManager->removeDeliverable(id); + break; + } return; } + auto& streamByteEvents = byteEventMapIt->second; // Callbacks are kept sorted by offset, so we can just walk the queue and // invoke those with offset below provided offset. - while (!deliveryCallbackIter->second.empty()) { - auto deliveryCallback = deliveryCallbackIter->second.front(); - auto& cbOffset = deliveryCallback.first; - if (cbOffset < offset) { - deliveryCallbackIter->second.pop_front(); - deliveryCallback.second->onCanceled(streamId, cbOffset); + while (!streamByteEvents.empty()) { + // decomposition not supported for xplat + const auto cbOffset = streamByteEvents.front().first; + const auto callback = streamByteEvents.front().second; + if (!offset.has_value() || cbOffset < *offset) { + streamByteEvents.pop_front(); + ByteEventCancellation cancellation = {}; + cancellation.id = id; + cancellation.offset = cbOffset; + cancellation.type = type; + callback->onByteEventCanceled(cancellation); if (closeState_ != CloseState::OPEN) { - // socket got closed - we can't use deliveryCallbackIter anymore, - // closeImpl should take care of delivering callbacks that are left in - // deliveryCallbackIter->second + // socket got closed - we can't use streamByteEvents anymore, + // closeImpl should take care of cleaning up any remaining callbacks return; } } else { @@ -1278,9 +1282,34 @@ void QuicTransportBase::cancelDeliveryCallbacksForStream( } // Clean up state for this stream if no callbacks left to invoke. - if (deliveryCallbackIter->second.empty()) { - conn_->streamManager->removeDeliverable(streamId); - deliveryCallbacks_.erase(deliveryCallbackIter); + if (streamByteEvents.empty()) { + switch (type) { + case ByteEvent::Type::ACK: + conn_->streamManager->removeDeliverable(id); + break; + } + byteEventMap.erase(byteEventMapIt); + } +} + +void QuicTransportBase::cancelAllByteEventCallbacks() { + cancelByteEventCallbacks(ByteEvent::Type::ACK); +} + +void QuicTransportBase::cancelByteEventCallbacks(const ByteEvent::Type type) { + ByteEventMap byteEventMap = std::move(getByteEventMap(type)); + for (const auto& byteEventMapIt : byteEventMap) { + const auto streamId = byteEventMapIt.first; + const auto callbackMap = byteEventMapIt.second; + for (const auto& callbackMapIt : callbackMap) { + const auto offset = callbackMapIt.first; + const auto callback = callbackMapIt.second; + ByteEventCancellation cancellation = {}; + cancellation.id = streamId; + cancellation.offset = offset; + cancellation.type = type; + callback->onByteEventCanceled(cancellation); + } } } @@ -1500,7 +1529,7 @@ void QuicTransportBase::processCallbacksAfterNetworkData() { for (auto pendingResetIt = conn_->pendingEvents.resets.begin(); pendingResetIt != conn_->pendingEvents.resets.end(); pendingResetIt++) { - cancelDeliveryCallbacksForStream(pendingResetIt->first); + cancelByteEventCallbacksForStream(pendingResetIt->first); if (closeState_ != CloseState::OPEN) { return; } @@ -1526,8 +1555,13 @@ void QuicTransportBase::processCallbacksAfterNetworkData() { deliveryCallbacksForAckedStream->second.pop_front(); auto currentDeliveryCallbackOffset = deliveryCallbackAndOffset.first; auto deliveryCallback = deliveryCallbackAndOffset.second; - deliveryCallback->onDeliveryAck( - streamId, currentDeliveryCallbackOffset, conn_->lossState.srtt); + + ByteEvent byteEvent = {}; + byteEvent.id = streamId; + byteEvent.offset = currentDeliveryCallbackOffset; + byteEvent.type = ByteEvent::Type::ACK; + byteEvent.srtt = conn_->lossState.srtt; + deliveryCallback->onByteEvent(byteEvent); if (closeState_ != CloseState::OPEN) { return; } @@ -1943,7 +1977,16 @@ folly::Expected QuicTransportBase::registerDeliveryCallback( StreamId id, uint64_t offset, - DeliveryCallback* cb) { + ByteEventCallback* cb) { + return registerByteEventCallback(ByteEvent::Type::ACK, id, offset, cb); +} + +folly::Expected +QuicTransportBase::registerByteEventCallback( + const ByteEvent::Type type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) { if (isReceivingStream(conn_->nodeType, id)) { return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); } @@ -1954,48 +1997,64 @@ QuicTransportBase::registerDeliveryCallback( if (!conn_->streamManager->streamExists(id)) { return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); } - if (cb) { - auto deliveryCallbackIt = deliveryCallbacks_.find(id); - if (deliveryCallbackIt == deliveryCallbacks_.end()) { - deliveryCallbacks_.emplace( - id, - std::initializer_list({{offset, cb}})); - } else { - // Keep DeliveryCallbacks for the same stream sorted by offsets: - auto pos = std::upper_bound( - deliveryCallbackIt->second.begin(), - deliveryCallbackIt->second.end(), + if (!cb) { + return folly::unit; + } + + ByteEventMap& byteEventMap = getByteEventMap(type); + auto byteEventMapIt = byteEventMap.find(id); + if (byteEventMapIt == byteEventMap.end()) { + byteEventMap.emplace( + id, + std::initializer_list::type::mapped_type::value_type>({{offset, cb}})); + } else { + // Keep ByteEvents for the same stream sorted by offsets: + auto pos = std::upper_bound( + byteEventMapIt->second.begin(), + byteEventMapIt->second.end(), + offset, + [&](uint64_t o, const std::pair& p) { + return o < p.first; + }); + byteEventMapIt->second.emplace(pos, offset, cb); + } + auto stream = conn_->streamManager->getStream(id); + + folly::Optional nextOffsetToWait; + switch (type) { + case ByteEvent::Type::ACK: + nextOffsetToWait = getLargestDeliverableOffset(*stream); + break; + } + if (nextOffsetToWait.has_value() && (offset < *nextOffsetToWait)) { + // This byte event has already occurred + runOnEvbAsync([id, cb, offset, type](auto selfObj) { + if (selfObj->closeState_ != CloseState::OPEN) { + // Close will error out all byte event callbacks. + return; + } + + auto& byteEventMapL = selfObj->getByteEventMap(type); + auto streamByteEventCbIt = byteEventMapL.find(id); + if (streamByteEventCbIt == byteEventMapL.end()) { + return; + } + auto pos = std::lower_bound( + streamByteEventCbIt->second.begin(), + streamByteEventCbIt->second.end(), offset, - [&](uint64_t o, const std::pair& p) { - return o < p.first; + [&](const std::pair& p, uint64_t o) { + return p.first < o; }); - deliveryCallbackIt->second.emplace(pos, offset, cb); - } - auto stream = conn_->streamManager->getStream(id); - auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream); - if (maxOffsetToDeliver.has_value() && (offset < *maxOffsetToDeliver)) { - // This offset is already delivered - runOnEvbAsync([id, cb, offset](auto selfObj) { - if (selfObj->closeState_ != CloseState::OPEN) { - // Close will error out all the delivery callbacks. - return; - } - auto streamDeliveryCbIt = selfObj->deliveryCallbacks_.find(id); - if (streamDeliveryCbIt == selfObj->deliveryCallbacks_.end()) { - return; - } - auto pos = std::lower_bound( - streamDeliveryCbIt->second.begin(), - streamDeliveryCbIt->second.end(), - offset, - [&](const std::pair& p, uint64_t o) { - return p.first < o; - }); - streamDeliveryCbIt->second.erase(pos); - cb->onDeliveryAck(id, offset, selfObj->conn_->lossState.srtt); - }); - } + streamByteEventCbIt->second.erase(pos); + + ByteEvent byteEvent = {}; + byteEvent.id = id; + byteEvent.offset = offset; + byteEvent.type = type; + cb->onByteEvent(byteEvent); + }); } return folly::unit; } @@ -2037,7 +2096,7 @@ folly::Expected QuicTransportBase::resetStream( closeState_ == CloseState::OPEN && pendingResetIt != conn_->pendingEvents.resets.end(); pendingResetIt++) { - cancelDeliveryCallbacksForStream(pendingResetIt->first); + cancelByteEventCallbacksForStream(pendingResetIt->first); } pendingWriteCallbacks_.erase(id); QUIC_STATS(conn_->statsCallback, onQuicStreamReset); @@ -2333,10 +2392,8 @@ void QuicTransportBase::cancelAllAppCallbacks( updateWriteLooper(true); }; conn_->streamManager->clearActionable(); - // Move the whole delivery callback map: - auto deliveryCallbacks = std::move(deliveryCallbacks_); - // Invoke onCanceled on the copy - cancelDeliveryCallbacks(deliveryCallbacks); + // Cancel any pending ByteEvent callbacks + cancelAllByteEventCallbacks(); // TODO: this will become simpler when we change the underlying data // structure of read callbacks. // TODO: this approach will make the app unable to setReadCallback to @@ -2506,29 +2563,6 @@ void QuicTransportBase::writeSocketDataAndCatch() { } } -void QuicTransportBase::cancelDeliveryCallbacks( - StreamId id, - const std::deque>& - deliveryCallbacks) { - for (auto iter = deliveryCallbacks.begin(); iter != deliveryCallbacks.end(); - iter++) { - auto currentDeliveryCallbackOffset = iter->first; - auto deliveryCallback = iter->second; - deliveryCallback->onCanceled(id, currentDeliveryCallbackOffset); - } -} - -void QuicTransportBase::cancelDeliveryCallbacks( - const folly::F14FastMap< - StreamId, - std::deque>>& - deliveryCallbacks) { - for (auto iter = deliveryCallbacks.begin(); iter != deliveryCallbacks.end(); - iter++) { - cancelDeliveryCallbacks(iter->first, iter->second); - } -} - void QuicTransportBase::setTransportSettings( TransportSettings transportSettings) { // If we've already encoded the transport parameters, silently return as @@ -2738,4 +2772,14 @@ QuicTransportBase::maybeResetStreamFromReadError( return folly::Expected(folly::unit); } +QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMap( + const ByteEvent::Type type) { + switch (type) { + case ByteEvent::Type::ACK: + return deliveryCallbacks_; + } + LOG(FATAL) << "Unhandled case in getByteEventMap"; + folly::assume_unreachable(); +} + } // namespace quic diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 90d7d7765..cba698f90 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -196,7 +196,7 @@ class QuicTransportBase : public QuicSocket { folly::Expected registerDeliveryCallback( StreamId id, uint64_t offset, - DeliveryCallback* cb) override; + ByteEventCallback* cb) override; folly::Optional shutdownWrite(StreamId id) override; @@ -238,27 +238,6 @@ class QuicTransportBase : public QuicSocket { folly::Optional setControlStream(StreamId id) override; - /** - * Invoke onCanceled for all the delivery callbacks in the deliveryCallbacks - * passed in. This is supposed to be a copy of the real deque of the delivery - * callbacks for the stream, so there is no need to pop anything off of it. - */ - static void cancelDeliveryCallbacks( - StreamId id, - const std::deque>& - deliveryCallbacks); - - /** - * Invoke onCanceled for all the delivery callbacks in the deliveryCallbacks - * map. This is supposed to be a copy of the real map of the delivery - * callbacks of the transport, so there is no need to erase anything from it. - */ - static void cancelDeliveryCallbacks( - const folly::F14FastMap< - StreamId, - std::deque>>& - deliveryCallbacks); - /** * Set the initial flow control window for the connection. */ @@ -326,14 +305,54 @@ class QuicTransportBase : public QuicSocket { /** * Invoke onCanceled on all the delivery callbacks registered for streamId. */ - void cancelDeliveryCallbacksForStream(StreamId streamId) override; + void cancelDeliveryCallbacksForStream(StreamId id) override; /** * Invoke onCanceled on all the delivery callbacks registered for streamId for * offsets lower than the offset provided. */ - void cancelDeliveryCallbacksForStream(StreamId streamId, uint64_t offset) - override; + void cancelDeliveryCallbacksForStream(StreamId id, uint64_t offset) override; + + /** + * Register a byte event to be triggered when specified event type occurs for + * the specified stream and offset. + */ + folly::Expected registerByteEventCallback( + const ByteEvent::Type type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) override; + + /** + * Cancel byte event callbacks for given stream. + * + * If an offset is provided, cancels only callbacks with an offset less than + * or equal to the provided offset, otherwise cancels all callbacks. + */ + void cancelByteEventCallbacksForStream( + const StreamId id, + const folly::Optional& offset = folly::none) override; + + /** + * Cancel byte event callbacks for given type and stream. + * + * If an offset is provided, cancels only callbacks with an offset less than + * or equal to the provided offset, otherwise cancels all callbacks. + */ + void cancelByteEventCallbacksForStream( + const ByteEvent::Type type, + const StreamId id, + const folly::Optional& offset = folly::none) override; + + /** + * Cancel all byte event callbacks of all streams. + */ + void cancelAllByteEventCallbacks() override; + + /** + * Cancel all byte event callbacks of all streams of the given type. + */ + void cancelByteEventCallbacks(const ByteEvent::Type type) override; // Timeout functions class LossTimeout : public folly::HHWheelTimer::Callback { @@ -617,6 +636,10 @@ class QuicTransportBase : public QuicSocket { PingCallback* callback, std::chrono::milliseconds pingTimeout); + using ByteEventMap = folly:: + F14FastMap>>; + ByteEventMap& getByteEventMap(const ByteEvent::Type type); + std::atomic evb_; std::unique_ptr socket_; ConnectionCallback* connCallback_{nullptr}; @@ -656,9 +679,9 @@ class QuicTransportBase : public QuicSocket { folly::F14FastMap readCallbacks_; folly::F14FastMap peekCallbacks_; - folly:: - F14FastMap>> - deliveryCallbacks_; + + ByteEventMap deliveryCallbacks_; + folly::F14FastMap dataExpiredCallbacks_; folly::F14FastMap dataRejectedCallbacks_; PingCallback* pingCallback_; diff --git a/quic/api/test/MockQuicSocket.h b/quic/api/test/MockQuicSocket.h index 1a032c3ad..da46f349d 100644 --- a/quic/api/test/MockQuicSocket.h +++ b/quic/api/test/MockQuicSocket.h @@ -140,6 +140,24 @@ class MockQuicSocket : public QuicSocket { MOCK_METHOD1( unregisterStreamWriteCallback, folly::Expected(StreamId)); + MOCK_METHOD4( + registerByteEventCallback, + folly::Expected( + const ByteEvent::Type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb)); + MOCK_METHOD2( + cancelByteEventCallbacksForStream, + void(const StreamId id, const folly::Optional& offset)); + MOCK_METHOD3( + cancelByteEventCallbacksForStream, + void( + const ByteEvent::Type, + const StreamId id, + const folly::Optional& offset)); + MOCK_METHOD0(cancelAllByteEventCallbacks, void()); + MOCK_METHOD1(cancelByteEventCallbacks, void(const ByteEvent::Type)); folly::Expected writeChain( StreamId id, Buf data, @@ -157,7 +175,7 @@ class MockQuicSocket : public QuicSocket { folly::Expected( StreamId, uint64_t, - DeliveryCallback*)); + ByteEventCallback*)); MOCK_METHOD1(shutdownWrite, folly::Optional(StreamId)); MOCK_METHOD2( diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 89a44075d..dc846a285 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -353,7 +353,12 @@ class TestQuicTransport auto deliveryCb = deliveryCallbacks_.find(id); if (deliveryCb != deliveryCallbacks_.end()) { for (auto& cbs : deliveryCb->second) { - cbs.second->onDeliveryAck(id, cbs.first, stream->conn.lossState.srtt); + ByteEvent event = {}; + event.id = id; + event.offset = cbs.first; + event.type = ByteEvent::Type::ACK; + event.srtt = stream->conn.lossState.srtt; + cbs.second->onByteEvent(event); if (closeState_ != CloseState::OPEN) { break; } @@ -1314,32 +1319,6 @@ TEST_F(QuicTransportImplTest, CloseStreamAfterReadFin) { transport.reset(); } -TEST_F(QuicTransportImplTest, CancelAllDeliveryCallbacksDeque) { - NiceMock mockedDeliveryCallback1, - mockedDeliveryCallback2; - std::deque> callbacks; - callbacks.emplace_back(0, &mockedDeliveryCallback1); - callbacks.emplace_back(100, &mockedDeliveryCallback2); - StreamId id = 0x123; - EXPECT_CALL(mockedDeliveryCallback1, onCanceled(id, 0)).Times(1); - EXPECT_CALL(mockedDeliveryCallback2, onCanceled(id, 100)).Times(1); - TestQuicTransport::cancelDeliveryCallbacks(id, callbacks); -} - -TEST_F(QuicTransportImplTest, CancelAllDeliveryCallbacksMap) { - NiceMock mockedDeliveryCallback1, - mockedDeliveryCallback2; - folly::F14FastMap< - StreamId, - std::deque>> - callbacks; - callbacks[0x123].emplace_back(0, &mockedDeliveryCallback1); - callbacks[0x135].emplace_back(100, &mockedDeliveryCallback2); - EXPECT_CALL(mockedDeliveryCallback1, onCanceled(0x123, 0)).Times(1); - EXPECT_CALL(mockedDeliveryCallback2, onCanceled(0x135, 100)).Times(1); - TestQuicTransport::cancelDeliveryCallbacks(callbacks); -} - TEST_F(QuicTransportImplTest, CloseTransportCleansupOutstandingCounters) { transport->transportConn->outstandings.handshakePacketsCount = 200; transport->closeNow(folly::none);