mirror of
https://github.com/facebook/proxygen.git
synced 2025-08-13 03:42:24 +03:00
Do not strip pending control streams of callbacks prematurely
Summary: Do not strip pending control streams of callbacks when connection is "idle" and has 0 bidir streams (transactions) left Reviewed By: mjoras Differential Revision: D35334419 fbshipit-source-id: e2ec919432d85b35637ffb4b7c8216f2f89db1fb
This commit is contained in:
committed by
Facebook GitHub Bot
parent
048764ff85
commit
080c83ccbc
@@ -2521,9 +2521,15 @@ void HQSession::detachStreamTransport(HQStreamTransportBase* hqStream) {
|
|||||||
eraseUnboundStream(hqStream);
|
eraseUnboundStream(hqStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are no established streams left, close the connection
|
|
||||||
if (getNumStreams() == 0) {
|
if (getNumStreams() == 0) {
|
||||||
cleanupPendingStreams();
|
folly::Optional<quic::QuicVersion> quicVersion;
|
||||||
|
if (sock_ && sock_->getState() && sock_->getState()->version.has_value()) {
|
||||||
|
quicVersion = sock_->getState()->version.value();
|
||||||
|
}
|
||||||
|
if (quicVersion.has_value() &&
|
||||||
|
(quicVersion.value() != quic::QuicVersion::MVFST_ALIAS2)) {
|
||||||
|
cleanupPendingStreams();
|
||||||
|
}
|
||||||
if (infoCallback_) {
|
if (infoCallback_) {
|
||||||
infoCallback_->onDeactivateConnection(*this);
|
infoCallback_->onDeactivateConnection(*this);
|
||||||
}
|
}
|
||||||
|
@@ -2319,6 +2319,53 @@ TEST_P(HQDownstreamSessionTestHQ, DelayedQPACKStopSendingReset) {
|
|||||||
hqSession_->closeWhenIdle();
|
hqSession_->closeWhenIdle();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
using ControlStreamsStallTest = HQDownstreamSessionBeforeTransportReadyTest;
|
||||||
|
|
||||||
|
TEST_P(ControlStreamsStallTest, StalledQpackStream) {
|
||||||
|
// This test does not pre-setup any control streams for a reason.
|
||||||
|
// We want to be able to control unidirectional (QPACK) stream lifetime to
|
||||||
|
// perform the test.
|
||||||
|
|
||||||
|
QuicConnectionStateBase state(quic::QuicNodeType::Server);
|
||||||
|
state.version = quic::QuicVersion::MVFST_ALIAS2;
|
||||||
|
EXPECT_CALL(*(socketDriver_->sock_), getState())
|
||||||
|
.WillRepeatedly(testing::Invoke(
|
||||||
|
[&state]() -> QuicConnectionStateBase* { return &state; }));
|
||||||
|
|
||||||
|
// Get a request going.
|
||||||
|
auto req = getGetRequest();
|
||||||
|
req.getHeaders().add("X-FB-Debug", "rfccffgvtvnenjkbtitkfdufddnvbecu");
|
||||||
|
auto id = sendRequest(req);
|
||||||
|
|
||||||
|
// Add data to the control stream 6 with fake stream offset 42.
|
||||||
|
// This is to ensure unidirectional stream dispatcher in session does not
|
||||||
|
// instantiate an actual full blown control stream object, but only create a
|
||||||
|
// pending tmp stream placeholder and attach a peek callback to it to wait for
|
||||||
|
// the preface.
|
||||||
|
std::array<uint8_t, 1> data1{0b00100111};
|
||||||
|
auto buf1 = folly::IOBuf::copyBuffer(data1.data(), data1.size());
|
||||||
|
socketDriver_->addReadEvent(6, std::move(buf1), milliseconds(0), 42);
|
||||||
|
|
||||||
|
// Let the sesion roll.
|
||||||
|
hqSession_->onTransportReady();
|
||||||
|
|
||||||
|
// Now cancel the request we queded up in the beginning.
|
||||||
|
socketDriver_->addStopSending(id, HTTP3::ErrorCode::HTTP_REQUEST_CANCELLED);
|
||||||
|
socketDriver_->addReadError(id,
|
||||||
|
HTTP3::ErrorCode::HTTP_REQUEST_CANCELLED,
|
||||||
|
std::chrono::milliseconds(0));
|
||||||
|
|
||||||
|
// Roll the event base. Once session loops, the expectation is that the
|
||||||
|
// pending control stream is alive and well and has its peek callback
|
||||||
|
// attached.
|
||||||
|
flushRequestsAndLoopN(1);
|
||||||
|
|
||||||
|
EXPECT_NE(socketDriver_->streams_[6].peekCB, nullptr);
|
||||||
|
|
||||||
|
// Close the session; all good.
|
||||||
|
hqSession_->closeWhenIdle();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(HQDownstreamSessionTestHQ, QPACKHeadersTooLarge) {
|
TEST_P(HQDownstreamSessionTestHQ, QPACKHeadersTooLarge) {
|
||||||
hqSession_->setEgressSettings({{SettingsId::MAX_HEADER_LIST_SIZE, 60}});
|
hqSession_->setEgressSettings({{SettingsId::MAX_HEADER_LIST_SIZE, 60}});
|
||||||
auto req = getGetRequest();
|
auto req = getGetRequest();
|
||||||
@@ -3144,6 +3191,16 @@ TEST_P(HQDownstreamSessionTestHQPush, StopSending) {
|
|||||||
using DropConnectionInTransportReadyTest =
|
using DropConnectionInTransportReadyTest =
|
||||||
HQDownstreamSessionBeforeTransportReadyTest;
|
HQDownstreamSessionBeforeTransportReadyTest;
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_SUITE_P(ControlStreamsStallTest,
|
||||||
|
ControlStreamsStallTest,
|
||||||
|
Values([] {
|
||||||
|
TestParams tp;
|
||||||
|
tp.alpn_ = "h3";
|
||||||
|
tp.checkUniridStreamCallbacks = false;
|
||||||
|
return tp;
|
||||||
|
}()),
|
||||||
|
paramsToTestName);
|
||||||
|
|
||||||
INSTANTIATE_TEST_SUITE_P(DropConnectionInTransportReadyTest,
|
INSTANTIATE_TEST_SUITE_P(DropConnectionInTransportReadyTest,
|
||||||
DropConnectionInTransportReadyTest,
|
DropConnectionInTransportReadyTest,
|
||||||
Values(
|
Values(
|
||||||
|
@@ -49,6 +49,7 @@ struct TestParams {
|
|||||||
std::size_t numBytesOnPushStream{kUnlimited};
|
std::size_t numBytesOnPushStream{kUnlimited};
|
||||||
bool expectOnTransportReady{true};
|
bool expectOnTransportReady{true};
|
||||||
bool datagrams_{false};
|
bool datagrams_{false};
|
||||||
|
bool checkUniridStreamCallbacks{true};
|
||||||
};
|
};
|
||||||
|
|
||||||
std::string prBodyScriptToName(const std::vector<uint8_t>& bodyScript);
|
std::string prBodyScriptToName(const std::vector<uint8_t>& bodyScript);
|
||||||
@@ -158,7 +159,8 @@ class HQSessionTest
|
|||||||
numCtrlStreams_ = ctrlStreamCount + qpackStreamCount;
|
numCtrlStreams_ = ctrlStreamCount + qpackStreamCount;
|
||||||
socketDriver_->setLocalAppCallback(this);
|
socketDriver_->setLocalAppCallback(this);
|
||||||
|
|
||||||
if (GetParam().unidirectionalStreamsCredit >= numCtrlStreams_) {
|
if (GetParam().checkUniridStreamCallbacks &&
|
||||||
|
GetParam().unidirectionalStreamsCredit >= numCtrlStreams_) {
|
||||||
auto dirModifier =
|
auto dirModifier =
|
||||||
(direction_ == proxygen::TransportDirection::DOWNSTREAM) ? 0 : 1;
|
(direction_ == proxygen::TransportDirection::DOWNSTREAM) ? 0 : 1;
|
||||||
EXPECT_CALL(infoCb_, onWrite(testing::_, testing::_))
|
EXPECT_CALL(infoCb_, onWrite(testing::_, testing::_))
|
||||||
|
@@ -92,6 +92,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
uint64_t flowControlWindow{65536};
|
uint64_t flowControlWindow{65536};
|
||||||
bool isControl{false};
|
bool isControl{false};
|
||||||
uint64_t lastSkipOffset{0};
|
uint64_t lastSkipOffset{0};
|
||||||
|
uint64_t fakePeekOffset{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@@ -1208,9 +1209,18 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
void addReadEvent(StreamId streamId,
|
void addReadEvent(StreamId streamId,
|
||||||
std::unique_ptr<folly::IOBuf> buf,
|
std::unique_ptr<folly::IOBuf> buf,
|
||||||
std::chrono::milliseconds delayFromPrevious =
|
std::chrono::milliseconds delayFromPrevious =
|
||||||
std::chrono::milliseconds(0)) {
|
std::chrono::milliseconds(0),
|
||||||
addReadEventInternal(
|
uint64_t fakePeekOffset = 0) {
|
||||||
streamId, std::move(buf), false, folly::none, delayFromPrevious);
|
addReadEventInternal(streamId,
|
||||||
|
std::move(buf),
|
||||||
|
false,
|
||||||
|
folly::none,
|
||||||
|
delayFromPrevious,
|
||||||
|
false /* stopSending */,
|
||||||
|
false /* datagramsAvailable */,
|
||||||
|
false /* pingReceived */,
|
||||||
|
false /* pingAcknowledged */,
|
||||||
|
fakePeekOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReadEvent(StreamId streamId,
|
void addReadEvent(StreamId streamId,
|
||||||
@@ -1319,7 +1329,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
bool ss,
|
bool ss,
|
||||||
bool da = false,
|
bool da = false,
|
||||||
bool pr = false,
|
bool pr = false,
|
||||||
bool pa = false)
|
bool pa = false,
|
||||||
|
uint64_t fpo = 0)
|
||||||
: streamId(s),
|
: streamId(s),
|
||||||
buf(std::move(b)),
|
buf(std::move(b)),
|
||||||
eof(e),
|
eof(e),
|
||||||
@@ -1327,7 +1338,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
stopSending(ss),
|
stopSending(ss),
|
||||||
datagramsAvailable(da),
|
datagramsAvailable(da),
|
||||||
pingReceived(pr),
|
pingReceived(pr),
|
||||||
pingAcknowledged(pa) {
|
pingAcknowledged(pa),
|
||||||
|
fakePeekOffset(fpo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamId streamId;
|
StreamId streamId;
|
||||||
@@ -1338,6 +1350,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
bool datagramsAvailable;
|
bool datagramsAvailable;
|
||||||
bool pingReceived;
|
bool pingReceived;
|
||||||
bool pingAcknowledged;
|
bool pingAcknowledged;
|
||||||
|
uint64_t fakePeekOffset;
|
||||||
};
|
};
|
||||||
|
|
||||||
void addReadEventInternal(StreamId streamId,
|
void addReadEventInternal(StreamId streamId,
|
||||||
@@ -1349,7 +1362,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
bool stopSending = false,
|
bool stopSending = false,
|
||||||
bool datagramsAvailable = false,
|
bool datagramsAvailable = false,
|
||||||
bool pingReceived = false,
|
bool pingReceived = false,
|
||||||
bool pingAcknowledged = false) {
|
bool pingAcknowledged = false,
|
||||||
|
uint64_t fakePeekOffset = 0) {
|
||||||
std::vector<ReadEvent> events;
|
std::vector<ReadEvent> events;
|
||||||
events.emplace_back(streamId,
|
events.emplace_back(streamId,
|
||||||
std::move(buf),
|
std::move(buf),
|
||||||
@@ -1358,7 +1372,8 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
stopSending,
|
stopSending,
|
||||||
datagramsAvailable,
|
datagramsAvailable,
|
||||||
pingReceived,
|
pingReceived,
|
||||||
pingAcknowledged);
|
pingAcknowledged,
|
||||||
|
fakePeekOffset);
|
||||||
addReadEvents(std::move(events), delayFromPrevious);
|
addReadEvents(std::move(events), delayFromPrevious);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1384,6 +1399,9 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
}
|
}
|
||||||
for (auto& event : events) {
|
for (auto& event : events) {
|
||||||
auto& stream = streams_[event.streamId];
|
auto& stream = streams_[event.streamId];
|
||||||
|
if (event.fakePeekOffset > 0) {
|
||||||
|
stream.fakePeekOffset = event.fakePeekOffset;
|
||||||
|
}
|
||||||
if (!event.error) {
|
if (!event.error) {
|
||||||
ERROR_IF(stream.readState == CLOSED,
|
ERROR_IF(stream.readState == CLOSED,
|
||||||
fmt::format("scheduling event on CLOSED streamId={}",
|
fmt::format("scheduling event on CLOSED streamId={}",
|
||||||
@@ -1453,8 +1471,11 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
std::deque<StreamBuffer> fakeReadBuffer;
|
std::deque<StreamBuffer> fakeReadBuffer;
|
||||||
stream.readBuf.gather(stream.readBuf.chainLength());
|
stream.readBuf.gather(stream.readBuf.chainLength());
|
||||||
auto copyBuf = stream.readBuf.front()->clone();
|
auto copyBuf = stream.readBuf.front()->clone();
|
||||||
fakeReadBuffer.emplace_back(
|
fakeReadBuffer.emplace_back(std::move(copyBuf),
|
||||||
std::move(copyBuf), stream.readOffset, stream.readEOF);
|
stream.fakePeekOffset
|
||||||
|
? stream.fakePeekOffset
|
||||||
|
: stream.readOffset,
|
||||||
|
stream.readEOF);
|
||||||
stream.peekCB->onDataAvailable(
|
stream.peekCB->onDataAvailable(
|
||||||
event.streamId,
|
event.streamId,
|
||||||
folly::Range<PeekIterator>(fakeReadBuffer.cbegin(),
|
folly::Range<PeekIterator>(fakeReadBuffer.cbegin(),
|
||||||
@@ -1615,8 +1636,11 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
|
|||||||
copyBufLen,
|
copyBufLen,
|
||||||
it.first),
|
it.first),
|
||||||
continue);
|
continue);
|
||||||
fakeReadBuffer.emplace_back(
|
fakeReadBuffer.emplace_back(std::move(copyBuf),
|
||||||
std::move(copyBuf), it.second.readOffset, it.second.readEOF);
|
it.second.fakePeekOffset
|
||||||
|
? it.second.fakePeekOffset
|
||||||
|
: it.second.readOffset,
|
||||||
|
it.second.readEOF);
|
||||||
it.second.peekCB->onDataAvailable(
|
it.second.peekCB->onDataAvailable(
|
||||||
it.first,
|
it.first,
|
||||||
folly::Range<PeekIterator>(fakeReadBuffer.cbegin(),
|
folly::Range<PeekIterator>(fakeReadBuffer.cbegin(),
|
||||||
|
Reference in New Issue
Block a user