diff --git a/proxygen/lib/http/session/HQSession.cpp b/proxygen/lib/http/session/HQSession.cpp index 8bf755687..442b8acb4 100644 --- a/proxygen/lib/http/session/HQSession.cpp +++ b/proxygen/lib/http/session/HQSession.cpp @@ -1496,12 +1496,16 @@ size_t HQSession::cleanupPendingStreams() { } void HQSession::clearStreamCallbacks(quic::StreamId id) { - sock_->setReadCallback(id, nullptr); - sock_->setPeekCallback(id, nullptr); + if (sock_) { + sock_->setReadCallback(id, nullptr); + sock_->setPeekCallback(id, nullptr); - if (isPartialReliabilityEnabled()) { - sock_->setDataExpiredCallback(id, nullptr); - sock_->setDataRejectedCallback(id, nullptr); + if (isPartialReliabilityEnabled()) { + sock_->setDataExpiredCallback(id, nullptr); + sock_->setDataRejectedCallback(id, nullptr); + } + } else { + VLOG(4) << "Attempt to clear callbacks on closed socket"; } } diff --git a/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp b/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp index d4fef32f7..da38488e3 100644 --- a/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp +++ b/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp @@ -518,6 +518,16 @@ TEST_P(HQUpstreamSessionTest, DropConnectionTwice) { hqSession_->dropConnection(); } +TEST_P(HQUpstreamSessionTest, DropConnectionTwiceWithPendingStreams) { + folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; + socketDriver_->addReadEvent(15, writeBuf.move()); + flushAndLoopN(1); + HQSession::DestructorGuard dg(hqSession_); + hqSession_->dropConnection(); + eventBase_.loopOnce(); + hqSession_->closeWhenIdle(); +} + TEST_P(HQUpstreamSessionTest, NotifyConnectCallbackBeforeDestruct) { MockConnectCallback connectCb; dynamic_cast(hqSession_)->setConnectCallback(&connectCb); @@ -1127,15 +1137,18 @@ class HQUpstreamSessionTestHQPush : public HQUpstreamSessionTest { outHeaderSize); } + // Shared implementation for different push stream // methods ServerStream& createPushStreamImpl(quic::StreamId streamId, - hq::PushId pushId, + folly::Optional pushId, std::size_t len = kUnlimited, bool eom = true) { - CHECK(hq::isInternalPushId(pushId)) + if (pushId.hasValue()) { + CHECK(hq::isInternalPushId(*pushId)) << "Expecting the push id to be in the internal representation"; + } auto c = makeCodec(streamId); // Setting a push id allows us to send push preface @@ -1151,9 +1164,11 @@ class HQUpstreamSessionTestHQPush : public HQUpstreamSessionTest { // Generate the push stream preface, and if there's enough headroom // the unframed push id that follows it auto prefaceRes = writePushStreamPreface(stream.id, len); - if (prefaceRes) { - len -= *prefaceRes; - writeUnframedPushId(stream.id, len, pushId); + if (pushId.hasValue()) { + if (prefaceRes) { + len -= *prefaceRes; + writeUnframedPushId(stream.id, len, *pushId); + } } return stream; @@ -1192,7 +1207,7 @@ class HQUpstreamSessionTestHQPush : public HQUpstreamSessionTest { // Create nascent stream (no body) void createNascentPushStream(quic::StreamId streamId, - hq::PushId pushId, + folly::Optional pushId, std::size_t len = kUnlimited, bool eom = true) { createPushStreamImpl(streamId, pushId, len, eom); @@ -1461,6 +1476,24 @@ TEST_P(HQUpstreamSessionTestHQPush, TestOnPushedTransactionOutOfOrder) { flushAndLoop(); } +TEST_P(HQUpstreamSessionTestHQPush, TestCloseDroppedConnection) { + HQSession::DestructorGuard dg(hqSession_); + // Two "onError" calls are expected: + // the first when MockQuicSocketDriver closes the socket + // the second when the error is propagated to the stream + EXPECT_CALL(*assocHandler_, onError(testing::_)).Times(2); + + // Create a nascent push stream with a preface only + createNascentPushStream(1111 /* streamId */, folly::none /* pushId */); + + // Run the event loop to let the dispatcher register the nascent stream + flushAndLoop(); + + // Drop the connection + hqSession_->dropConnection(); + flushAndLoop(); +} + TEST_P(HQUpstreamSessionTestHQPush, TestOrphanedPushStream) { // the transaction is expected to timeout, since the PushPromise does not have // EOF set, and it is not followed by a PushStream. @@ -1537,16 +1570,16 @@ INSTANTIATE_TEST_CASE_P( // Instantiate tests for H3 Push functionality (requires HQ) INSTANTIATE_TEST_CASE_P(HQUpstreamSessionTest, HQUpstreamSessionTestHQPush, - Values(TestParams({.alpn_ = "h3"})), + Values(TestParams({ + .alpn_ = "h3", + .unidirectionalStreamsCredit = 4 + })), paramsToTestName); INSTANTIATE_TEST_CASE_P( HQUpstreamSessionTest, HQUpstreamSessionTestIngressHQPush, - Values(TestParams({.alpn_ = "h3", .numBytesOnPushStream = 8}), - TestParams({.alpn_ = "h3", .numBytesOnPushStream = 15}), - TestParams({.alpn_ = "h3", .numBytesOnPushStream = 16}), - TestParams({ + Values(TestParams({ .alpn_ = "h3", .unidirectionalStreamsCredit = 4, .numBytesOnPushStream = 8,