mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-06 22:22:38 +03:00
Allow unset read callback during connection close
Summary: setReadCallback didn't function properly during shutdown 1) it was completely ignored when state_ == CLOSING 2) cancelAllAppCallbacks made a copy of readCallbacks_ This is problematic for application constructs that use groups of streams -- eg: HTTP WebTransport. When one stream (the WebTransport session) is reset during shutdown, it needs to clean up any dependent streams as well, including preventing them from getting error callbacks. The fix is to use only the streamId's from readCallbacksCopy for iteration, but look up the actual callback value from readCallbacks_. Note: there's an as yet unhandled corner case which is that a readError callback installs a new stream read callback, but it seems far fetched enough I'm not including a fix here. Reviewed By: sharmafb Differential Revision: D48159644 fbshipit-source-id: c9d522a6b200538193969d60d242a505831e4cd0
This commit is contained in:
committed by
Facebook GitHub Bot
parent
185d853a9c
commit
900e22e18d
@@ -753,7 +753,7 @@ QuicTransportBaseLite::setReadCallback(
|
|||||||
if (isSendingStream(conn_->nodeType, id)) {
|
if (isSendingStream(conn_->nodeType, id)) {
|
||||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||||
}
|
}
|
||||||
if (closeState_ != CloseState::OPEN) {
|
if (cb != nullptr && closeState_ != CloseState::OPEN) {
|
||||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||||
}
|
}
|
||||||
if (!conn_->streamManager->streamExists(id)) {
|
if (!conn_->streamManager->streamExists(id)) {
|
||||||
@@ -2224,20 +2224,28 @@ void QuicTransportBaseLite::cancelAllAppCallbacks(
|
|||||||
// structure of read callbacks.
|
// structure of read callbacks.
|
||||||
// TODO: this approach will make the app unable to setReadCallback to
|
// TODO: this approach will make the app unable to setReadCallback to
|
||||||
// nullptr during the loop. Need to fix that.
|
// nullptr during the loop. Need to fix that.
|
||||||
// TODO: setReadCallback to nullptr closes the stream, so the app
|
|
||||||
// may just do that...
|
|
||||||
auto readCallbacksCopy = readCallbacks_;
|
auto readCallbacksCopy = readCallbacks_;
|
||||||
for (auto& cb : readCallbacksCopy) {
|
for (auto& cb : readCallbacksCopy) {
|
||||||
readCallbacks_.erase(cb.first);
|
auto streamId = cb.first;
|
||||||
if (cb.second.readCb) {
|
auto it = readCallbacks_.find(streamId);
|
||||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(cb.first));
|
if (it == readCallbacks_.end()) {
|
||||||
|
// An earlier call to readError removed the stream from readCallbacks
|
||||||
|
// May not be possible?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (it->second.readCb) {
|
||||||
|
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
|
||||||
if (!stream->groupId) {
|
if (!stream->groupId) {
|
||||||
cb.second.readCb->readError(cb.first, err);
|
it->second.readCb->readError(streamId, err);
|
||||||
} else {
|
} else {
|
||||||
cb.second.readCb->readErrorWithGroup(cb.first, *stream->groupId, err);
|
it->second.readCb->readErrorWithGroup(streamId, *stream->groupId, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
readCallbacks_.erase(it);
|
||||||
}
|
}
|
||||||
|
// TODO: what if a call to readError installs a new read callback?
|
||||||
|
LOG_IF(ERROR, !readCallbacks_.empty())
|
||||||
|
<< readCallbacks_.size() << " read callbacks remaining to be cleared";
|
||||||
|
|
||||||
VLOG(4) << "Clearing datagram callback";
|
VLOG(4) << "Clearing datagram callback";
|
||||||
datagramCallback_ = nullptr;
|
datagramCallback_ = nullptr;
|
||||||
|
@@ -48,6 +48,7 @@ cpp_unittest(
|
|||||||
"//folly/io:iobuf",
|
"//folly/io:iobuf",
|
||||||
"//quic:constants",
|
"//quic:constants",
|
||||||
"//quic/api:transport",
|
"//quic/api:transport",
|
||||||
|
"//quic/api:transport_helpers",
|
||||||
"//quic/common:buf_util",
|
"//quic/common:buf_util",
|
||||||
"//quic/common/events:highres_quic_timer",
|
"//quic/common/events:highres_quic_timer",
|
||||||
"//quic/common/test:test_utils",
|
"//quic/common/test:test_utils",
|
||||||
@@ -98,7 +99,7 @@ cpp_unittest(
|
|||||||
deps = [
|
deps = [
|
||||||
":mocks",
|
":mocks",
|
||||||
"//folly:range",
|
"//folly:range",
|
||||||
"//quic/api:transport",
|
"//quic/api:transport_helpers",
|
||||||
"//quic/common/events:folly_eventbase",
|
"//quic/common/events:folly_eventbase",
|
||||||
"//quic/common/test:test_utils",
|
"//quic/common/test:test_utils",
|
||||||
"//quic/common/testutil:mock_async_udp_socket",
|
"//quic/common/testutil:mock_async_udp_socket",
|
||||||
@@ -119,7 +120,7 @@ cpp_unittest(
|
|||||||
deps = [
|
deps = [
|
||||||
":mocks",
|
":mocks",
|
||||||
"//folly/portability:gtest",
|
"//folly/portability:gtest",
|
||||||
"//quic/api:transport",
|
"//quic/api:transport_helpers",
|
||||||
"//quic/client:state_and_handshake",
|
"//quic/client:state_and_handshake",
|
||||||
"//quic/codec:pktbuilder",
|
"//quic/codec:pktbuilder",
|
||||||
"//quic/codec/test:mocks",
|
"//quic/codec/test:mocks",
|
||||||
@@ -140,7 +141,7 @@ cpp_unittest(
|
|||||||
"IoBufQuicBatchTest.cpp",
|
"IoBufQuicBatchTest.cpp",
|
||||||
],
|
],
|
||||||
deps = [
|
deps = [
|
||||||
"//quic/api:transport",
|
"//quic/api:transport_helpers",
|
||||||
"//quic/client:state_and_handshake",
|
"//quic/client:state_and_handshake",
|
||||||
"//quic/common/events:folly_eventbase",
|
"//quic/common/events:folly_eventbase",
|
||||||
"//quic/common/test:test_utils",
|
"//quic/common/test:test_utils",
|
||||||
@@ -233,6 +234,7 @@ mvfst_cpp_library(
|
|||||||
],
|
],
|
||||||
exported_deps = [
|
exported_deps = [
|
||||||
"//quic/api:transport",
|
"//quic/api:transport",
|
||||||
|
"//quic/api:transport_helpers",
|
||||||
"//quic/common/test:test_utils",
|
"//quic/common/test:test_utils",
|
||||||
"//quic/dsr/frontend:write_functions",
|
"//quic/dsr/frontend:write_functions",
|
||||||
"//quic/fizz/server/handshake:fizz_server_handshake",
|
"//quic/fizz/server/handshake:fizz_server_handshake",
|
||||||
|
@@ -3094,12 +3094,15 @@ TEST_P(QuicTransportImplTestBase, TestGracefulCloseWithNoActiveStream) {
|
|||||||
|
|
||||||
TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
|
TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
|
||||||
auto stream = transport->createBidirectionalStream().value();
|
auto stream = transport->createBidirectionalStream().value();
|
||||||
|
auto stream2 = transport->createBidirectionalStream().value();
|
||||||
NiceMock<MockWriteCallback> wcb;
|
NiceMock<MockWriteCallback> wcb;
|
||||||
NiceMock<MockWriteCallback> wcbConn;
|
NiceMock<MockWriteCallback> wcbConn;
|
||||||
NiceMock<MockReadCallback> rcb;
|
NiceMock<MockReadCallback> rcb;
|
||||||
|
NiceMock<MockReadCallback> rcb2;
|
||||||
NiceMock<MockPeekCallback> pcb;
|
NiceMock<MockPeekCallback> pcb;
|
||||||
NiceMock<MockDeliveryCallback> deliveryCb;
|
NiceMock<MockDeliveryCallback> deliveryCb;
|
||||||
NiceMock<MockByteEventCallback> txCb;
|
NiceMock<MockByteEventCallback> txCb;
|
||||||
|
uint8_t resetCount = 0;
|
||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
wcb,
|
wcb,
|
||||||
onStreamWriteError(
|
onStreamWriteError(
|
||||||
@@ -3107,8 +3110,21 @@ TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
|
|||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
wcbConn,
|
wcbConn,
|
||||||
onConnectionWriteError(IsAppError(GenericApplicationErrorCode::UNKNOWN)));
|
onConnectionWriteError(IsAppError(GenericApplicationErrorCode::UNKNOWN)));
|
||||||
EXPECT_CALL(
|
// The first stream to get a reset will clear the other read callback, so only
|
||||||
rcb, readError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
|
// one will receive a reset.
|
||||||
|
ON_CALL(
|
||||||
|
rcb, readError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)))
|
||||||
|
.WillByDefault(InvokeWithoutArgs([this, stream2, &resetCount] {
|
||||||
|
transport->setReadCallback(stream2, nullptr);
|
||||||
|
resetCount++;
|
||||||
|
}));
|
||||||
|
ON_CALL(
|
||||||
|
rcb2,
|
||||||
|
readError(stream2, IsAppError(GenericApplicationErrorCode::UNKNOWN)))
|
||||||
|
.WillByDefault(InvokeWithoutArgs([this, stream, &resetCount] {
|
||||||
|
transport->setReadCallback(stream, nullptr);
|
||||||
|
resetCount++;
|
||||||
|
}));
|
||||||
EXPECT_CALL(
|
EXPECT_CALL(
|
||||||
pcb, peekError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
|
pcb, peekError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
|
||||||
EXPECT_CALL(deliveryCb, onCanceled(stream, _));
|
EXPECT_CALL(deliveryCb, onCanceled(stream, _));
|
||||||
@@ -3120,6 +3136,7 @@ TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
|
|||||||
transport->notifyPendingWriteOnConnection(&wcbConn);
|
transport->notifyPendingWriteOnConnection(&wcbConn);
|
||||||
transport->notifyPendingWriteOnStream(stream, &wcb);
|
transport->notifyPendingWriteOnStream(stream, &wcb);
|
||||||
transport->setReadCallback(stream, &rcb);
|
transport->setReadCallback(stream, &rcb);
|
||||||
|
transport->setReadCallback(stream2, &rcb2);
|
||||||
transport->setPeekCallback(stream, &pcb);
|
transport->setPeekCallback(stream, &pcb);
|
||||||
EXPECT_CALL(*socketPtr, write(_, _, _))
|
EXPECT_CALL(*socketPtr, write(_, _, _))
|
||||||
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
|
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
|
||||||
@@ -3151,6 +3168,7 @@ TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
|
|||||||
EXPECT_EQ(
|
EXPECT_EQ(
|
||||||
transport->transportConn->streamManager->getStream(stream), nullptr);
|
transport->transportConn->streamManager->getStream(stream), nullptr);
|
||||||
qEvb->loopOnce();
|
qEvb->loopOnce();
|
||||||
|
EXPECT_EQ(resetCount, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(QuicTransportImplTestBase, ResetStreamUnsetWriteCallback) {
|
TEST_P(QuicTransportImplTestBase, ResetStreamUnsetWriteCallback) {
|
||||||
|
Reference in New Issue
Block a user