From fca90d483bc6c30e14b406e0877a797e7f41cecc Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Tue, 10 Dec 2024 16:39:37 -0800 Subject: [PATCH] Keep track of the minimum reliable size ACKed Summary: With normal resets, we transition from `StreamSendState::ResetSent` to `StreamSendState::Closed` when we get an ACK for a RESET_STREAM frame. With reliable resets, this is going to be a little more complicated. We can't automatically transition from `StreamSendState::ResetSent` to `StreamSendState::Closed` when we get an ACK for a RESET_STREAM_AT frame because it's possible that the peer hasn't yet received all data until the reliable reset offset. My idea is the following: Keep track of the `minReliableSizeAcked`, which is the lowest value of the reliable size in any RESET_STREAM_AT frame that was ACKed by the peer. We set it to 0 if a RESET_STREAM frame was ACKed by the peer. Then, we can transition from `StreamSendState::ResetSent` to `StreamSendState::Closed` if any one of the following two events happen: * We get an ACK for a RESET_STREAM_AT or a RESET_STREAM frame, and all data until the `minReliableSizeAcked` has been ACKed by the peer. * We get an ACK for stream data, and this puts us in a state where all data until the `minReliableSizeAcked` has been ACKed by the peer. Note: This diff doesn't have any functional change. The only change is that we're keeping track of the `minReliableSizeAcked`, but aren't using it anywhere. Reviewed By: mjoras Differential Revision: D66781199 fbshipit-source-id: 2aa5138a18f70e9801e59e747460558ba706939c --- quic/api/test/QuicTransportBaseTest.cpp | 4 +- quic/client/QuicClientTransportLite.cpp | 2 +- quic/server/state/ServerStateMachine.cpp | 2 +- quic/state/StreamData.h | 5 + quic/state/stream/StreamSendHandlers.cpp | 11 ++- quic/state/stream/StreamSendHandlers.h | 4 +- .../stream/test/StreamStateMachineTest.cpp | 99 ++++++++++++++++++- quic/state/test/QuicStateFunctionsTest.cpp | 2 +- 8 files changed, 119 insertions(+), 10 deletions(-) diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 048040c47..a8a547474 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -797,7 +797,7 @@ TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) { // suppose we tx a rst stream (and rx its corresponding ack), expect // terminal state and queued in closed streams transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR); - sendRstAckSMHandler(*stream); + sendRstAckSMHandler(*stream, folly::none); EXPECT_TRUE(stream->inTerminalStates()); EXPECT_TRUE(streamManager.closedStreams().contains(streamID)); transport->driveReadCallbacks(); @@ -831,7 +831,7 @@ TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) { // suppose we tx a rst stream (and rx its corresponding ack) transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR); - sendRstAckSMHandler(*stream); + sendRstAckSMHandler(*stream, folly::none); EXPECT_EQ(stream->sendState, StreamSendState::Closed); EXPECT_EQ(stream->recvState, StreamRecvState::Open); transport->driveReadCallbacks(); diff --git a/quic/client/QuicClientTransportLite.cpp b/quic/client/QuicClientTransportLite.cpp index c1ce4d887..b68d23c9a 100644 --- a/quic/client/QuicClientTransportLite.cpp +++ b/quic/client/QuicClientTransportLite.cpp @@ -419,7 +419,7 @@ void QuicClientTransportLite::processUdpPacketData( auto stream = conn_->streamManager->getStream(frame.streamId); if (stream) { - sendRstAckSMHandler(*stream); + sendRstAckSMHandler(*stream, frame.reliableSize); } break; } diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index d2bec5ed3..6d6a1278f 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -1060,7 +1060,7 @@ void onServerReadDataFromOpen( << frame.streamId << " " << conn; auto stream = conn.streamManager->getStream(frame.streamId); if (stream) { - sendRstAckSMHandler(*stream); + sendRstAckSMHandler(*stream, frame.reliableSize); } break; } diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index ab5cf7fe0..245c2412a 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -184,6 +184,11 @@ struct QuicStreamLike { // subsequently send a RESET_STREAM frame, we reset this value to none. Optional reliableSizeToPeer; + // This is used in order to determine whether we can transition the send state + // from ResetSent to Closed. Is 0 if a RESET_STREAM without a reliable size + // has been ACKed. + Optional minReliableSizeAcked; + // This is set if we send a RESET_STREAM or RELIABLE_RESET_STREAM frame to // the peer. We store this in order to ensure that we use the same value of // the application error code for all resets we send to the peer, as mandated diff --git a/quic/state/stream/StreamSendHandlers.cpp b/quic/state/stream/StreamSendHandlers.cpp index bbbc65acf..16b8b8cdd 100644 --- a/quic/state/stream/StreamSendHandlers.cpp +++ b/quic/state/stream/StreamSendHandlers.cpp @@ -191,11 +191,20 @@ void sendAckSMHandler( } } -void sendRstAckSMHandler(QuicStreamState& stream) { +void sendRstAckSMHandler( + QuicStreamState& stream, + folly::Optional reliableSize) { switch (stream.sendState) { case StreamSendState::ResetSent: { VLOG(10) << "ResetSent: Transition to closed stream=" << stream.id << " " << stream.conn; + // Note that we set minReliableSizeAcked to 0 for non-reliable resets. + if (!stream.minReliableSizeAcked.hasValue()) { + stream.minReliableSizeAcked = reliableSize.value_or(0); + } else { + stream.minReliableSizeAcked = + std::min(*stream.minReliableSizeAcked, reliableSize.value_or(0)); + } stream.sendState = StreamSendState::Closed; if (stream.inTerminalStates()) { stream.conn.streamManager->addClosed(stream.id); diff --git a/quic/state/stream/StreamSendHandlers.h b/quic/state/stream/StreamSendHandlers.h index 5f784a2ba..f0f731487 100644 --- a/quic/state/stream/StreamSendHandlers.h +++ b/quic/state/stream/StreamSendHandlers.h @@ -25,6 +25,8 @@ void sendAckSMHandler( QuicStreamState& stream, const WriteStreamFrame& ackedFrame); -void sendRstAckSMHandler(QuicStreamState& stream); +void sendRstAckSMHandler( + QuicStreamState& stream, + folly::Optional reliableSize); } // namespace quic diff --git a/quic/state/stream/test/StreamStateMachineTest.cpp b/quic/state/stream/test/StreamStateMachineTest.cpp index c90314e9d..d155e29dd 100644 --- a/quic/state/stream/test/StreamStateMachineTest.cpp +++ b/quic/state/stream/test/StreamStateMachineTest.cpp @@ -96,7 +96,8 @@ TEST_F(QuicOpenStateTest, InvalidEvent) { StreamId id = 5; QuicStreamState stream(id, *conn); RstStreamFrame frame(1, GenericApplicationErrorCode::UNKNOWN, 0); - EXPECT_THROW(sendRstAckSMHandler(stream), QuicTransportException); + EXPECT_THROW( + sendRstAckSMHandler(stream, folly::none), QuicTransportException); } TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFIN) { @@ -299,11 +300,103 @@ TEST_F(QuicResetSentStateTest, RstAck) { stream.readBuffer.emplace_back( folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); - sendRstAckSMHandler(stream); + sendRstAckSMHandler(stream, folly::none); EXPECT_EQ(stream.sendState, StreamSendState::Closed); EXPECT_FALSE(stream.finalReadOffset); EXPECT_FALSE(stream.readBuffer.empty()); + EXPECT_EQ(*stream.minReliableSizeAcked, 0); +} + +// A reset with a reliable size of 3 was previously ACKed. +// Now, we're getting an ACK for a reset with a reliable size of 5, +// and should therefore not see a reduction in minReliableSizeAcked +TEST_F(QuicResetSentStateTest, ReliableRstAckNoReduction) { + auto conn = createConn(); + StreamId id = 5; + + QuicStreamState stream(id, *conn); + stream.minReliableSizeAcked = 3; + stream.sendState = StreamSendState::ResetSent; + stream.currentReadOffset = 0xABCD; + stream.finalWriteOffset = 0xACDC; + stream.readBuffer.emplace_back( + folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); + RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); + sendRstAckSMHandler(stream, 5); + + EXPECT_EQ(stream.sendState, StreamSendState::Closed); + EXPECT_FALSE(stream.finalReadOffset); + EXPECT_FALSE(stream.readBuffer.empty()); + EXPECT_EQ(*stream.minReliableSizeAcked, 3); +} + +// A reset with a reliable size of 3 was previously ACKed. +// Now, we're getting an ACK for a reset with a reliable size +// of 1, and should therefore see a reduction in minReliableSizeAcked +TEST_F(QuicResetSentStateTest, ReliableRstAckReduction) { + auto conn = createConn(); + StreamId id = 5; + + QuicStreamState stream(id, *conn); + stream.minReliableSizeAcked = 3; + stream.sendState = StreamSendState::ResetSent; + stream.currentReadOffset = 0xABCD; + stream.finalWriteOffset = 0xACDC; + stream.readBuffer.emplace_back( + folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); + RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); + sendRstAckSMHandler(stream, 1); + + EXPECT_EQ(stream.sendState, StreamSendState::Closed); + EXPECT_FALSE(stream.finalReadOffset); + EXPECT_FALSE(stream.readBuffer.empty()); + EXPECT_EQ(*stream.minReliableSizeAcked, 1); +} + +// There were no previously ACKed resets. Therefore, when we get an +// ACK for a reset with a reliable size of 1, we should set the +// minReliableSizeAcked to 1. +TEST_F(QuicResetSentStateTest, ReliableRstAckFirstTime) { + auto conn = createConn(); + StreamId id = 5; + + QuicStreamState stream(id, *conn); + stream.sendState = StreamSendState::ResetSent; + stream.currentReadOffset = 0xABCD; + stream.finalWriteOffset = 0xACDC; + stream.readBuffer.emplace_back( + folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); + RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); + sendRstAckSMHandler(stream, 1); + + EXPECT_EQ(stream.sendState, StreamSendState::Closed); + EXPECT_FALSE(stream.finalReadOffset); + EXPECT_FALSE(stream.readBuffer.empty()); + EXPECT_EQ(*stream.minReliableSizeAcked, 1); +} + +// A reset with a reliable size of 3 was previously ACKed. +// Now, we're getting an ACK for a non-reliable reset, and should +// therefore set the minReliableSizeAcked to 0. +TEST_F(QuicResetSentStateTest, RstAfterReliableRst) { + auto conn = createConn(); + StreamId id = 5; + + QuicStreamState stream(id, *conn); + stream.minReliableSizeAcked = 3; + stream.sendState = StreamSendState::ResetSent; + stream.currentReadOffset = 0xABCD; + stream.finalWriteOffset = 0xACDC; + stream.readBuffer.emplace_back( + folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); + RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); + sendRstAckSMHandler(stream, folly::none); + + EXPECT_EQ(stream.sendState, StreamSendState::Closed); + EXPECT_FALSE(stream.finalReadOffset); + EXPECT_FALSE(stream.readBuffer.empty()); + EXPECT_EQ(*stream.minReliableSizeAcked, 0); } class QuicClosedStateTest : public Test {}; @@ -314,7 +407,7 @@ TEST_F(QuicClosedStateTest, RstAck) { QuicStreamState stream(id, *conn); stream.sendState = StreamSendState::Closed; RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); - sendRstAckSMHandler(stream); + sendRstAckSMHandler(stream, folly::none); EXPECT_EQ(stream.sendState, StreamSendState::Closed); } diff --git a/quic/state/test/QuicStateFunctionsTest.cpp b/quic/state/test/QuicStateFunctionsTest.cpp index 46e31f759..3af329c39 100644 --- a/quic/state/test/QuicStateFunctionsTest.cpp +++ b/quic/state/test/QuicStateFunctionsTest.cpp @@ -1059,7 +1059,7 @@ TEST_F(QuicStateFunctionsTest, TestInvokeStreamStateMachineStreamError) { QuicStreamState stream(1, conn); RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100); try { - sendRstAckSMHandler(stream); + sendRstAckSMHandler(stream, folly::none); ADD_FAILURE(); } catch (QuicTransportException& ex) { EXPECT_EQ(ex.errorCode(), TransportErrorCode::STREAM_STATE_ERROR);