1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

Keep track of the minimum reliable size ACKed

Summary:
With normal resets, we transition from `StreamSendState::ResetSent` to `StreamSendState::Closed` when we get an ACK for a RESET_STREAM frame.

With reliable resets, this is going to be a little more complicated. We can't automatically transition from `StreamSendState::ResetSent` to `StreamSendState::Closed` when we get an ACK for a RESET_STREAM_AT frame because it's possible that the peer hasn't yet received all data until the reliable reset offset.

My idea is the following: Keep track of the `minReliableSizeAcked`, which is the lowest value of the reliable size in any RESET_STREAM_AT frame that was ACKed by the peer. We set it to 0 if a RESET_STREAM frame was ACKed by the peer.

Then, we can transition from `StreamSendState::ResetSent` to `StreamSendState::Closed` if any one of the following two events happen:
* We get an ACK for a RESET_STREAM_AT or a RESET_STREAM frame, and all data until the `minReliableSizeAcked` has been ACKed by the peer.
* We get an ACK for stream data, and this puts us in a state where all data until the `minReliableSizeAcked` has been ACKed by the peer.

Note: This diff doesn't have any functional change. The only change is that we're keeping track of the `minReliableSizeAcked`, but aren't using it anywhere.

Reviewed By: mjoras

Differential Revision: D66781199

fbshipit-source-id: 2aa5138a18f70e9801e59e747460558ba706939c
This commit is contained in:
Aman Sharma
2024-12-10 16:39:37 -08:00
committed by Facebook GitHub Bot
parent 1695ecc575
commit fca90d483b
8 changed files with 119 additions and 10 deletions

View File

@@ -797,7 +797,7 @@ TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) {
// suppose we tx a rst stream (and rx its corresponding ack), expect // suppose we tx a rst stream (and rx its corresponding ack), expect
// terminal state and queued in closed streams // terminal state and queued in closed streams
transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR); transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR);
sendRstAckSMHandler(*stream); sendRstAckSMHandler(*stream, folly::none);
EXPECT_TRUE(stream->inTerminalStates()); EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID)); EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
transport->driveReadCallbacks(); transport->driveReadCallbacks();
@@ -831,7 +831,7 @@ TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) {
// suppose we tx a rst stream (and rx its corresponding ack) // suppose we tx a rst stream (and rx its corresponding ack)
transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR); transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR);
sendRstAckSMHandler(*stream); sendRstAckSMHandler(*stream, folly::none);
EXPECT_EQ(stream->sendState, StreamSendState::Closed); EXPECT_EQ(stream->sendState, StreamSendState::Closed);
EXPECT_EQ(stream->recvState, StreamRecvState::Open); EXPECT_EQ(stream->recvState, StreamRecvState::Open);
transport->driveReadCallbacks(); transport->driveReadCallbacks();

View File

@@ -419,7 +419,7 @@ void QuicClientTransportLite::processUdpPacketData(
auto stream = conn_->streamManager->getStream(frame.streamId); auto stream = conn_->streamManager->getStream(frame.streamId);
if (stream) { if (stream) {
sendRstAckSMHandler(*stream); sendRstAckSMHandler(*stream, frame.reliableSize);
} }
break; break;
} }

View File

@@ -1060,7 +1060,7 @@ void onServerReadDataFromOpen(
<< frame.streamId << " " << conn; << frame.streamId << " " << conn;
auto stream = conn.streamManager->getStream(frame.streamId); auto stream = conn.streamManager->getStream(frame.streamId);
if (stream) { if (stream) {
sendRstAckSMHandler(*stream); sendRstAckSMHandler(*stream, frame.reliableSize);
} }
break; break;
} }

View File

@@ -184,6 +184,11 @@ struct QuicStreamLike {
// subsequently send a RESET_STREAM frame, we reset this value to none. // subsequently send a RESET_STREAM frame, we reset this value to none.
Optional<uint64_t> reliableSizeToPeer; Optional<uint64_t> reliableSizeToPeer;
// This is used in order to determine whether we can transition the send state
// from ResetSent to Closed. Is 0 if a RESET_STREAM without a reliable size
// has been ACKed.
Optional<uint64_t> minReliableSizeAcked;
// This is set if we send a RESET_STREAM or RELIABLE_RESET_STREAM frame to // This is set if we send a RESET_STREAM or RELIABLE_RESET_STREAM frame to
// the peer. We store this in order to ensure that we use the same value of // the peer. We store this in order to ensure that we use the same value of
// the application error code for all resets we send to the peer, as mandated // the application error code for all resets we send to the peer, as mandated

View File

@@ -191,11 +191,20 @@ void sendAckSMHandler(
} }
} }
void sendRstAckSMHandler(QuicStreamState& stream) { void sendRstAckSMHandler(
QuicStreamState& stream,
folly::Optional<uint64_t> reliableSize) {
switch (stream.sendState) { switch (stream.sendState) {
case StreamSendState::ResetSent: { case StreamSendState::ResetSent: {
VLOG(10) << "ResetSent: Transition to closed stream=" << stream.id << " " VLOG(10) << "ResetSent: Transition to closed stream=" << stream.id << " "
<< stream.conn; << stream.conn;
// Note that we set minReliableSizeAcked to 0 for non-reliable resets.
if (!stream.minReliableSizeAcked.hasValue()) {
stream.minReliableSizeAcked = reliableSize.value_or(0);
} else {
stream.minReliableSizeAcked =
std::min(*stream.minReliableSizeAcked, reliableSize.value_or(0));
}
stream.sendState = StreamSendState::Closed; stream.sendState = StreamSendState::Closed;
if (stream.inTerminalStates()) { if (stream.inTerminalStates()) {
stream.conn.streamManager->addClosed(stream.id); stream.conn.streamManager->addClosed(stream.id);

View File

@@ -25,6 +25,8 @@ void sendAckSMHandler(
QuicStreamState& stream, QuicStreamState& stream,
const WriteStreamFrame& ackedFrame); const WriteStreamFrame& ackedFrame);
void sendRstAckSMHandler(QuicStreamState& stream); void sendRstAckSMHandler(
QuicStreamState& stream,
folly::Optional<uint64_t> reliableSize);
} // namespace quic } // namespace quic

View File

@@ -96,7 +96,8 @@ TEST_F(QuicOpenStateTest, InvalidEvent) {
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
RstStreamFrame frame(1, GenericApplicationErrorCode::UNKNOWN, 0); RstStreamFrame frame(1, GenericApplicationErrorCode::UNKNOWN, 0);
EXPECT_THROW(sendRstAckSMHandler(stream), QuicTransportException); EXPECT_THROW(
sendRstAckSMHandler(stream, folly::none), QuicTransportException);
} }
TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFIN) { TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFIN) {
@@ -299,11 +300,103 @@ TEST_F(QuicResetSentStateTest, RstAck) {
stream.readBuffer.emplace_back( stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream); sendRstAckSMHandler(stream, folly::none);
EXPECT_EQ(stream.sendState, StreamSendState::Closed); EXPECT_EQ(stream.sendState, StreamSendState::Closed);
EXPECT_FALSE(stream.finalReadOffset); EXPECT_FALSE(stream.finalReadOffset);
EXPECT_FALSE(stream.readBuffer.empty()); EXPECT_FALSE(stream.readBuffer.empty());
EXPECT_EQ(*stream.minReliableSizeAcked, 0);
}
// A reset with a reliable size of 3 was previously ACKed.
// Now, we're getting an ACK for a reset with a reliable size of 5,
// and should therefore not see a reduction in minReliableSizeAcked
TEST_F(QuicResetSentStateTest, ReliableRstAckNoReduction) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.minReliableSizeAcked = 3;
stream.sendState = StreamSendState::ResetSent;
stream.currentReadOffset = 0xABCD;
stream.finalWriteOffset = 0xACDC;
stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream, 5);
EXPECT_EQ(stream.sendState, StreamSendState::Closed);
EXPECT_FALSE(stream.finalReadOffset);
EXPECT_FALSE(stream.readBuffer.empty());
EXPECT_EQ(*stream.minReliableSizeAcked, 3);
}
// A reset with a reliable size of 3 was previously ACKed.
// Now, we're getting an ACK for a reset with a reliable size
// of 1, and should therefore see a reduction in minReliableSizeAcked
TEST_F(QuicResetSentStateTest, ReliableRstAckReduction) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.minReliableSizeAcked = 3;
stream.sendState = StreamSendState::ResetSent;
stream.currentReadOffset = 0xABCD;
stream.finalWriteOffset = 0xACDC;
stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream, 1);
EXPECT_EQ(stream.sendState, StreamSendState::Closed);
EXPECT_FALSE(stream.finalReadOffset);
EXPECT_FALSE(stream.readBuffer.empty());
EXPECT_EQ(*stream.minReliableSizeAcked, 1);
}
// There were no previously ACKed resets. Therefore, when we get an
// ACK for a reset with a reliable size of 1, we should set the
// minReliableSizeAcked to 1.
TEST_F(QuicResetSentStateTest, ReliableRstAckFirstTime) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.sendState = StreamSendState::ResetSent;
stream.currentReadOffset = 0xABCD;
stream.finalWriteOffset = 0xACDC;
stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream, 1);
EXPECT_EQ(stream.sendState, StreamSendState::Closed);
EXPECT_FALSE(stream.finalReadOffset);
EXPECT_FALSE(stream.readBuffer.empty());
EXPECT_EQ(*stream.minReliableSizeAcked, 1);
}
// A reset with a reliable size of 3 was previously ACKed.
// Now, we're getting an ACK for a non-reliable reset, and should
// therefore set the minReliableSizeAcked to 0.
TEST_F(QuicResetSentStateTest, RstAfterReliableRst) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.minReliableSizeAcked = 3;
stream.sendState = StreamSendState::ResetSent;
stream.currentReadOffset = 0xABCD;
stream.finalWriteOffset = 0xACDC;
stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream, folly::none);
EXPECT_EQ(stream.sendState, StreamSendState::Closed);
EXPECT_FALSE(stream.finalReadOffset);
EXPECT_FALSE(stream.readBuffer.empty());
EXPECT_EQ(*stream.minReliableSizeAcked, 0);
} }
class QuicClosedStateTest : public Test {}; class QuicClosedStateTest : public Test {};
@@ -314,7 +407,7 @@ TEST_F(QuicClosedStateTest, RstAck) {
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.sendState = StreamSendState::Closed; stream.sendState = StreamSendState::Closed;
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream); sendRstAckSMHandler(stream, folly::none);
EXPECT_EQ(stream.sendState, StreamSendState::Closed); EXPECT_EQ(stream.sendState, StreamSendState::Closed);
} }

View File

@@ -1059,7 +1059,7 @@ TEST_F(QuicStateFunctionsTest, TestInvokeStreamStateMachineStreamError) {
QuicStreamState stream(1, conn); QuicStreamState stream(1, conn);
RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100); RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100);
try { try {
sendRstAckSMHandler(stream); sendRstAckSMHandler(stream, folly::none);
ADD_FAILURE(); ADD_FAILURE();
} catch (QuicTransportException& ex) { } catch (QuicTransportException& ex) {
EXPECT_EQ(ex.errorCode(), TransportErrorCode::STREAM_STATE_ERROR); EXPECT_EQ(ex.errorCode(), TransportErrorCode::STREAM_STATE_ERROR);