diff --git a/proxygen/lib/http/session/HQSession.cpp b/proxygen/lib/http/session/HQSession.cpp index 3061559c0..03882b1df 100644 --- a/proxygen/lib/http/session/HQSession.cpp +++ b/proxygen/lib/http/session/HQSession.cpp @@ -2521,9 +2521,15 @@ void HQSession::detachStreamTransport(HQStreamTransportBase* hqStream) { eraseUnboundStream(hqStream); } - // If there are no established streams left, close the connection if (getNumStreams() == 0) { - cleanupPendingStreams(); + folly::Optional quicVersion; + if (sock_ && sock_->getState() && sock_->getState()->version.has_value()) { + quicVersion = sock_->getState()->version.value(); + } + if (quicVersion.has_value() && + (quicVersion.value() != quic::QuicVersion::MVFST_ALIAS2)) { + cleanupPendingStreams(); + } if (infoCallback_) { infoCallback_->onDeactivateConnection(*this); } diff --git a/proxygen/lib/http/session/test/HQDownstreamSessionTest.cpp b/proxygen/lib/http/session/test/HQDownstreamSessionTest.cpp index 005faf8ff..c05acc6bf 100644 --- a/proxygen/lib/http/session/test/HQDownstreamSessionTest.cpp +++ b/proxygen/lib/http/session/test/HQDownstreamSessionTest.cpp @@ -2319,6 +2319,53 @@ TEST_P(HQDownstreamSessionTestHQ, DelayedQPACKStopSendingReset) { hqSession_->closeWhenIdle(); } +using ControlStreamsStallTest = HQDownstreamSessionBeforeTransportReadyTest; + +TEST_P(ControlStreamsStallTest, StalledQpackStream) { + // This test does not pre-setup any control streams for a reason. + // We want to be able to control unidirectional (QPACK) stream lifetime to + // perform the test. + + QuicConnectionStateBase state(quic::QuicNodeType::Server); + state.version = quic::QuicVersion::MVFST_ALIAS2; + EXPECT_CALL(*(socketDriver_->sock_), getState()) + .WillRepeatedly(testing::Invoke( + [&state]() -> QuicConnectionStateBase* { return &state; })); + + // Get a request going. + auto req = getGetRequest(); + req.getHeaders().add("X-FB-Debug", "rfccffgvtvnenjkbtitkfdufddnvbecu"); + auto id = sendRequest(req); + + // Add data to the control stream 6 with fake stream offset 42. + // This is to ensure unidirectional stream dispatcher in session does not + // instantiate an actual full blown control stream object, but only create a + // pending tmp stream placeholder and attach a peek callback to it to wait for + // the preface. + std::array data1{0b00100111}; + auto buf1 = folly::IOBuf::copyBuffer(data1.data(), data1.size()); + socketDriver_->addReadEvent(6, std::move(buf1), milliseconds(0), 42); + + // Let the sesion roll. + hqSession_->onTransportReady(); + + // Now cancel the request we queded up in the beginning. + socketDriver_->addStopSending(id, HTTP3::ErrorCode::HTTP_REQUEST_CANCELLED); + socketDriver_->addReadError(id, + HTTP3::ErrorCode::HTTP_REQUEST_CANCELLED, + std::chrono::milliseconds(0)); + + // Roll the event base. Once session loops, the expectation is that the + // pending control stream is alive and well and has its peek callback + // attached. + flushRequestsAndLoopN(1); + + EXPECT_NE(socketDriver_->streams_[6].peekCB, nullptr); + + // Close the session; all good. + hqSession_->closeWhenIdle(); +} + TEST_P(HQDownstreamSessionTestHQ, QPACKHeadersTooLarge) { hqSession_->setEgressSettings({{SettingsId::MAX_HEADER_LIST_SIZE, 60}}); auto req = getGetRequest(); @@ -3144,6 +3191,16 @@ TEST_P(HQDownstreamSessionTestHQPush, StopSending) { using DropConnectionInTransportReadyTest = HQDownstreamSessionBeforeTransportReadyTest; +INSTANTIATE_TEST_SUITE_P(ControlStreamsStallTest, + ControlStreamsStallTest, + Values([] { + TestParams tp; + tp.alpn_ = "h3"; + tp.checkUniridStreamCallbacks = false; + return tp; + }()), + paramsToTestName); + INSTANTIATE_TEST_SUITE_P(DropConnectionInTransportReadyTest, DropConnectionInTransportReadyTest, Values( diff --git a/proxygen/lib/http/session/test/HQSessionTestCommon.h b/proxygen/lib/http/session/test/HQSessionTestCommon.h index 1d8242c0f..750f9fc64 100644 --- a/proxygen/lib/http/session/test/HQSessionTestCommon.h +++ b/proxygen/lib/http/session/test/HQSessionTestCommon.h @@ -49,6 +49,7 @@ struct TestParams { std::size_t numBytesOnPushStream{kUnlimited}; bool expectOnTransportReady{true}; bool datagrams_{false}; + bool checkUniridStreamCallbacks{true}; }; std::string prBodyScriptToName(const std::vector& bodyScript); @@ -158,7 +159,8 @@ class HQSessionTest numCtrlStreams_ = ctrlStreamCount + qpackStreamCount; socketDriver_->setLocalAppCallback(this); - if (GetParam().unidirectionalStreamsCredit >= numCtrlStreams_) { + if (GetParam().checkUniridStreamCallbacks && + GetParam().unidirectionalStreamsCredit >= numCtrlStreams_) { auto dirModifier = (direction_ == proxygen::TransportDirection::DOWNSTREAM) ? 0 : 1; EXPECT_CALL(infoCb_, onWrite(testing::_, testing::_)) diff --git a/proxygen/lib/http/session/test/MockQuicSocketDriver.h b/proxygen/lib/http/session/test/MockQuicSocketDriver.h index 9ea960da6..825c5eeb0 100644 --- a/proxygen/lib/http/session/test/MockQuicSocketDriver.h +++ b/proxygen/lib/http/session/test/MockQuicSocketDriver.h @@ -92,6 +92,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { uint64_t flowControlWindow{65536}; bool isControl{false}; uint64_t lastSkipOffset{0}; + uint64_t fakePeekOffset{0}; }; public: @@ -1208,9 +1209,18 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { void addReadEvent(StreamId streamId, std::unique_ptr buf, std::chrono::milliseconds delayFromPrevious = - std::chrono::milliseconds(0)) { - addReadEventInternal( - streamId, std::move(buf), false, folly::none, delayFromPrevious); + std::chrono::milliseconds(0), + uint64_t fakePeekOffset = 0) { + addReadEventInternal(streamId, + std::move(buf), + false, + folly::none, + delayFromPrevious, + false /* stopSending */, + false /* datagramsAvailable */, + false /* pingReceived */, + false /* pingAcknowledged */, + fakePeekOffset); } void addReadEvent(StreamId streamId, @@ -1319,7 +1329,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { bool ss, bool da = false, bool pr = false, - bool pa = false) + bool pa = false, + uint64_t fpo = 0) : streamId(s), buf(std::move(b)), eof(e), @@ -1327,7 +1338,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { stopSending(ss), datagramsAvailable(da), pingReceived(pr), - pingAcknowledged(pa) { + pingAcknowledged(pa), + fakePeekOffset(fpo) { } StreamId streamId; @@ -1338,6 +1350,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { bool datagramsAvailable; bool pingReceived; bool pingAcknowledged; + uint64_t fakePeekOffset; }; void addReadEventInternal(StreamId streamId, @@ -1349,7 +1362,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { bool stopSending = false, bool datagramsAvailable = false, bool pingReceived = false, - bool pingAcknowledged = false) { + bool pingAcknowledged = false, + uint64_t fakePeekOffset = 0) { std::vector events; events.emplace_back(streamId, std::move(buf), @@ -1358,7 +1372,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { stopSending, datagramsAvailable, pingReceived, - pingAcknowledged); + pingAcknowledged, + fakePeekOffset); addReadEvents(std::move(events), delayFromPrevious); } @@ -1384,6 +1399,9 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { } for (auto& event : events) { auto& stream = streams_[event.streamId]; + if (event.fakePeekOffset > 0) { + stream.fakePeekOffset = event.fakePeekOffset; + } if (!event.error) { ERROR_IF(stream.readState == CLOSED, fmt::format("scheduling event on CLOSED streamId={}", @@ -1453,8 +1471,11 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { std::deque fakeReadBuffer; stream.readBuf.gather(stream.readBuf.chainLength()); auto copyBuf = stream.readBuf.front()->clone(); - fakeReadBuffer.emplace_back( - std::move(copyBuf), stream.readOffset, stream.readEOF); + fakeReadBuffer.emplace_back(std::move(copyBuf), + stream.fakePeekOffset + ? stream.fakePeekOffset + : stream.readOffset, + stream.readEOF); stream.peekCB->onDataAvailable( event.streamId, folly::Range(fakeReadBuffer.cbegin(), @@ -1615,8 +1636,11 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { copyBufLen, it.first), continue); - fakeReadBuffer.emplace_back( - std::move(copyBuf), it.second.readOffset, it.second.readEOF); + fakeReadBuffer.emplace_back(std::move(copyBuf), + it.second.fakePeekOffset + ? it.second.fakePeekOffset + : it.second.readOffset, + it.second.readEOF); it.second.peekCB->onDataAvailable( it.first, folly::Range(fakeReadBuffer.cbegin(),