diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 9a34e53f8..97e1b8954 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -812,6 +812,9 @@ folly::Expected QuicTransportBase::stopSending( return folly::unit; } + if (conn_->transportSettings.dropIngressOnStopSending) { + processTxStopSending(*stream); + } // send STOP_SENDING frame to peer sendSimpleFrame(*conn_, StopSendingFrame(id, error)); updateWriteLooper(true); diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index e4306181d..93ac1147b 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -384,6 +385,10 @@ class TestQuicTransport return writeLooper_; } + auto& readLooper() { + return readLooper_; + } + void unbindConnection() {} void onReadError(const folly::AsyncSocketException&) noexcept {} @@ -668,6 +673,116 @@ TEST_P(QuicTransportImplTestBase, IdleTimeoutStreamMaessage) { } TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) { + // update transport settings + auto transportSettings = transport->getTransportSettings(); + transportSettings.dropIngressOnStopSending = true; + transport->setTransportSettings(transportSettings); + auto& streamManager = *transport->transportConn->streamManager; + + auto unknownErrorCode = GenericApplicationErrorCode::UNKNOWN; + std::string ingressData = "some ingress stream data"; + auto ingressDataLen = ingressData.size(); + + StreamId streamID; + QuicStreamState* stream; + + // create bidirectional stream + streamID = transport->createBidirectionalStream().value(); + NiceMock readCb1; + transport->setReadCallback(streamID, &readCb1); + + // add ingress & egress data to stream + transport->addDataToStream( + streamID, StreamBuffer(folly::IOBuf::copyBuffer(ingressData), 0)); + transport->writeChain( + streamID, folly::IOBuf::copyBuffer("some egress stream data"), false); + transport->driveReadCallbacks(); + stream = CHECK_NOTNULL(transport->getStream(streamID)); + + // check stream has readable data and SM is open + EXPECT_TRUE(stream->hasReadableData()); + EXPECT_EQ(stream->sendState, StreamSendState::Open); + EXPECT_EQ(stream->recvState, StreamRecvState::Open); + + // send stop sending to peer – this and later invoking reset stream should not + // invoke ReadCallback::readError() + EXPECT_CALL(readCb1, readError(streamID, _)).Times(0); + transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR); + + // check that we've discarded any ingress data and ingress SM is closed + EXPECT_FALSE(stream->hasReadableData()); + EXPECT_FALSE(streamManager.readableStreams().contains(streamID)); + EXPECT_EQ(stream->sendState, StreamSendState::Open); + EXPECT_EQ(stream->recvState, StreamRecvState::Closed); + + // suppose we tx a rst stream (and rx its corresponding ack), expect + // terminal state and queued in closed streams + transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR); + sendRstAckSMHandler(*stream); + EXPECT_TRUE(stream->inTerminalStates()); + EXPECT_TRUE(streamManager.closedStreams().contains(streamID)); + transport->driveReadCallbacks(); + + // now if we rx a rst_stream we should deliver ReadCallback::readError() + EXPECT_TRUE(streamManager.streamExists(streamID)); + EXPECT_CALL(readCb1, readError(streamID, QuicError(unknownErrorCode))) + .Times(1); + receiveRstStreamSMHandler( + *stream, RstStreamFrame(streamID, unknownErrorCode, ingressDataLen)); + transport->readLooper()->runLoopCallback(); + + // same test as above, but we tx a rst stream first followed by send stop + // sending second to validate that .stopSending() queues stream to be closed + NiceMock readCb2; + streamID = transport->createBidirectionalStream().value(); + transport->setReadCallback(streamID, &readCb2); + + // add ingress & egress data to new stream + transport->addDataToStream( + streamID, StreamBuffer(folly::IOBuf::copyBuffer(ingressData), 0)); + transport->writeChain( + streamID, folly::IOBuf::copyBuffer("some egress stream data"), false); + transport->driveReadCallbacks(); + stream = CHECK_NOTNULL(transport->getStream(streamID)); + + // check stream has readable data and SM is open + EXPECT_TRUE(stream->hasReadableData()); + EXPECT_EQ(stream->sendState, StreamSendState::Open); + EXPECT_EQ(stream->recvState, StreamRecvState::Open); + + // suppose we tx a rst stream (and rx its corresponding ack) + transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR); + sendRstAckSMHandler(*stream); + EXPECT_EQ(stream->sendState, StreamSendState::Closed); + EXPECT_EQ(stream->recvState, StreamRecvState::Open); + transport->driveReadCallbacks(); + + // send stop sending to peer – does not deliver an error to the read callback + // even tho the stream is in terminal state and queued for closing + EXPECT_CALL(readCb2, readError(streamID, _)).Times(0); + transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR); + + // check that we've discarded any ingress data and ingress SM is closed, + // expect terminal state and queued in closed streams + EXPECT_FALSE(stream->hasReadableData()); + EXPECT_FALSE(streamManager.readableStreams().contains(streamID)); + EXPECT_TRUE(stream->inTerminalStates()); + EXPECT_TRUE(streamManager.closedStreams().contains(streamID)); + + // we need to rx a rst stream before queue stream to be closed to allow + // delivering callback to application + EXPECT_CALL(readCb2, readError(streamID, QuicError(unknownErrorCode))) + .Times(1); + receiveRstStreamSMHandler( + *stream, RstStreamFrame(streamID, unknownErrorCode, ingressDataLen)); + EXPECT_TRUE(stream->inTerminalStates()); + EXPECT_TRUE(streamManager.closedStreams().contains(streamID)); + transport->readLooper()->runLoopCallback(); + + transport.reset(); +} + +TEST_P(QuicTransportImplTestBase, NoopStopSendingIngressClosed) { // create bidi stream auto streamID = transport->createBidirectionalStream().value(); auto* stream = CHECK_NOTNULL(transport->getStream(streamID)); diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index ee98eb3da..b887dcb9e 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -495,4 +495,16 @@ void processCryptoStreamAck( } cryptoStream.retransmissionBuffer.erase(ackedBuffer); } + +void processTxStopSending(QuicStreamState& stream) { + // no longer interested in ingress + auto id = stream.id; + auto& streamManager = stream.conn.streamManager; + stream.recvState = StreamRecvState::Closed; + stream.readBuffer.clear(); + streamManager->readableStreams().erase(id); + if (stream.inTerminalStates()) { + streamManager->addClosed(id); + } +} } // namespace quic diff --git a/quic/state/QuicStreamFunctions.h b/quic/state/QuicStreamFunctions.h index f0bf73086..c60abc022 100644 --- a/quic/state/QuicStreamFunctions.h +++ b/quic/state/QuicStreamFunctions.h @@ -162,4 +162,7 @@ void processCryptoStreamAck( uint64_t offset, uint64_t len); +// Drops ingress when sending STOP_SENDING to peer +void processTxStopSending(QuicStreamState& stream); + } // namespace quic diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index 0a34c35a1..4ef1b8975 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -323,6 +323,8 @@ struct TransportSettings { // 0 means stream groups are disabled. uint64_t maxStreamGroupsAdvertized{0}; bool experimentalPacer{false}; + // experimental flag to close ingress SM when invoking stopSending + bool dropIngressOnStopSending{false}; }; } // namespace quic diff --git a/quic/state/stream/StreamReceiveHandlers.cpp b/quic/state/stream/StreamReceiveHandlers.cpp index 578d75a15..405bf2b6b 100644 --- a/quic/state/stream/StreamReceiveHandlers.cpp +++ b/quic/state/stream/StreamReceiveHandlers.cpp @@ -57,10 +57,8 @@ void receiveReadStreamFrameSMHandler( } case StreamRecvState::Closed: { CHECK(!isSendingStream(stream.conn.nodeType, stream.id)); - VLOG_IF(10, frame.fin) << "Closed: Received data with fin" - << " stream=" << stream.id << " " << stream.conn; - appendDataToReadBuffer( - stream, StreamBuffer(std::move(frame.data), frame.offset, frame.fin)); + VLOG(10) << "Closed: Received discarding data stream=" << stream.id + << " fin=" << frame.fin << " " << stream.conn; break; } case StreamRecvState::Invalid: {