1
0
mirror of https://github.com/facebook/proxygen.git synced 2025-08-17 13:21:04 +03:00

Check for the sock_ before clearing the callbacks

Reviewed By: lnicco

Differential Revision: D15670952

fbshipit-source-id: 44537e917b79b94d34413438357fcef94556ad90
This commit is contained in:
Omer Shapira
2019-06-09 20:18:19 -07:00
committed by Facebook Github Bot
parent 69dfa36563
commit 31dc592daf
2 changed files with 53 additions and 16 deletions

View File

@@ -1496,12 +1496,16 @@ size_t HQSession::cleanupPendingStreams() {
} }
void HQSession::clearStreamCallbacks(quic::StreamId id) { void HQSession::clearStreamCallbacks(quic::StreamId id) {
sock_->setReadCallback(id, nullptr); if (sock_) {
sock_->setPeekCallback(id, nullptr); sock_->setReadCallback(id, nullptr);
sock_->setPeekCallback(id, nullptr);
if (isPartialReliabilityEnabled()) { if (isPartialReliabilityEnabled()) {
sock_->setDataExpiredCallback(id, nullptr); sock_->setDataExpiredCallback(id, nullptr);
sock_->setDataRejectedCallback(id, nullptr); sock_->setDataRejectedCallback(id, nullptr);
}
} else {
VLOG(4) << "Attempt to clear callbacks on closed socket";
} }
} }

View File

@@ -518,6 +518,16 @@ TEST_P(HQUpstreamSessionTest, DropConnectionTwice) {
hqSession_->dropConnection(); hqSession_->dropConnection();
} }
TEST_P(HQUpstreamSessionTest, DropConnectionTwiceWithPendingStreams) {
folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()};
socketDriver_->addReadEvent(15, writeBuf.move());
flushAndLoopN(1);
HQSession::DestructorGuard dg(hqSession_);
hqSession_->dropConnection();
eventBase_.loopOnce();
hqSession_->closeWhenIdle();
}
TEST_P(HQUpstreamSessionTest, NotifyConnectCallbackBeforeDestruct) { TEST_P(HQUpstreamSessionTest, NotifyConnectCallbackBeforeDestruct) {
MockConnectCallback connectCb; MockConnectCallback connectCb;
dynamic_cast<HQUpstreamSession*>(hqSession_)->setConnectCallback(&connectCb); dynamic_cast<HQUpstreamSession*>(hqSession_)->setConnectCallback(&connectCb);
@@ -1127,15 +1137,18 @@ class HQUpstreamSessionTestHQPush : public HQUpstreamSessionTest {
outHeaderSize); outHeaderSize);
} }
// Shared implementation for different push stream // Shared implementation for different push stream
// methods // methods
ServerStream& createPushStreamImpl(quic::StreamId streamId, ServerStream& createPushStreamImpl(quic::StreamId streamId,
hq::PushId pushId, folly::Optional<hq::PushId> pushId,
std::size_t len = kUnlimited, std::size_t len = kUnlimited,
bool eom = true) { bool eom = true) {
CHECK(hq::isInternalPushId(pushId)) if (pushId.hasValue()) {
CHECK(hq::isInternalPushId(*pushId))
<< "Expecting the push id to be in the internal representation"; << "Expecting the push id to be in the internal representation";
}
auto c = makeCodec(streamId); auto c = makeCodec(streamId);
// Setting a push id allows us to send push preface // Setting a push id allows us to send push preface
@@ -1151,9 +1164,11 @@ class HQUpstreamSessionTestHQPush : public HQUpstreamSessionTest {
// Generate the push stream preface, and if there's enough headroom // Generate the push stream preface, and if there's enough headroom
// the unframed push id that follows it // the unframed push id that follows it
auto prefaceRes = writePushStreamPreface(stream.id, len); auto prefaceRes = writePushStreamPreface(stream.id, len);
if (prefaceRes) { if (pushId.hasValue()) {
len -= *prefaceRes; if (prefaceRes) {
writeUnframedPushId(stream.id, len, pushId); len -= *prefaceRes;
writeUnframedPushId(stream.id, len, *pushId);
}
} }
return stream; return stream;
@@ -1192,7 +1207,7 @@ class HQUpstreamSessionTestHQPush : public HQUpstreamSessionTest {
// Create nascent stream (no body) // Create nascent stream (no body)
void createNascentPushStream(quic::StreamId streamId, void createNascentPushStream(quic::StreamId streamId,
hq::PushId pushId, folly::Optional<hq::PushId> pushId,
std::size_t len = kUnlimited, std::size_t len = kUnlimited,
bool eom = true) { bool eom = true) {
createPushStreamImpl(streamId, pushId, len, eom); createPushStreamImpl(streamId, pushId, len, eom);
@@ -1461,6 +1476,24 @@ TEST_P(HQUpstreamSessionTestHQPush, TestOnPushedTransactionOutOfOrder) {
flushAndLoop(); flushAndLoop();
} }
TEST_P(HQUpstreamSessionTestHQPush, TestCloseDroppedConnection) {
HQSession::DestructorGuard dg(hqSession_);
// Two "onError" calls are expected:
// the first when MockQuicSocketDriver closes the socket
// the second when the error is propagated to the stream
EXPECT_CALL(*assocHandler_, onError(testing::_)).Times(2);
// Create a nascent push stream with a preface only
createNascentPushStream(1111 /* streamId */, folly::none /* pushId */);
// Run the event loop to let the dispatcher register the nascent stream
flushAndLoop();
// Drop the connection
hqSession_->dropConnection();
flushAndLoop();
}
TEST_P(HQUpstreamSessionTestHQPush, TestOrphanedPushStream) { TEST_P(HQUpstreamSessionTestHQPush, TestOrphanedPushStream) {
// the transaction is expected to timeout, since the PushPromise does not have // the transaction is expected to timeout, since the PushPromise does not have
// EOF set, and it is not followed by a PushStream. // EOF set, and it is not followed by a PushStream.
@@ -1537,16 +1570,16 @@ INSTANTIATE_TEST_CASE_P(
// Instantiate tests for H3 Push functionality (requires HQ) // Instantiate tests for H3 Push functionality (requires HQ)
INSTANTIATE_TEST_CASE_P(HQUpstreamSessionTest, INSTANTIATE_TEST_CASE_P(HQUpstreamSessionTest,
HQUpstreamSessionTestHQPush, HQUpstreamSessionTestHQPush,
Values(TestParams({.alpn_ = "h3"})), Values(TestParams({
.alpn_ = "h3",
.unidirectionalStreamsCredit = 4
})),
paramsToTestName); paramsToTestName);
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
HQUpstreamSessionTest, HQUpstreamSessionTest,
HQUpstreamSessionTestIngressHQPush, HQUpstreamSessionTestIngressHQPush,
Values(TestParams({.alpn_ = "h3", .numBytesOnPushStream = 8}), Values(TestParams({
TestParams({.alpn_ = "h3", .numBytesOnPushStream = 15}),
TestParams({.alpn_ = "h3", .numBytesOnPushStream = 16}),
TestParams({
.alpn_ = "h3", .alpn_ = "h3",
.unidirectionalStreamsCredit = 4, .unidirectionalStreamsCredit = 4,
.numBytesOnPushStream = 8, .numBytesOnPushStream = 8,