diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index bc0b68a22..8c68156ba 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -1788,6 +1788,8 @@ TEST_F(QuicTransportTest, WriteFlowControl) { auto buf1 = buf->clone(); buf1->trimEnd(50); stream->flowControlState.peerAdvertisedMaxOffset = 200; + // Reset the pendingBlockedFrame as if we received a flow control update. + stream->flowControlState.pendingBlockedFrame = false; EXPECT_CALL(*mockQLogger, addTransportStateUpdate(getFlowControlEvent(200))); conn.streamManager->updateWritableStreams(*stream); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); diff --git a/quic/flowcontrol/QuicFlowController.cpp b/quic/flowcontrol/QuicFlowController.cpp index c0c1d0ca0..f98818ee6 100644 --- a/quic/flowcontrol/QuicFlowController.cpp +++ b/quic/flowcontrol/QuicFlowController.cpp @@ -269,9 +269,11 @@ void maybeWriteBlockAfterSocketWrite(QuicStreamState& stream) { (!stream.writeBuffer.empty() || stream.writeBufMeta.length > 0); } - if (shouldEmitStreamBlockedFrame) { + if (shouldEmitStreamBlockedFrame && + !stream.flowControlState.pendingBlockedFrame) { stream.conn.streamManager->queueBlocked( stream.id, stream.flowControlState.peerAdvertisedMaxOffset); + stream.flowControlState.pendingBlockedFrame = true; if (stream.conn.qLogger) { stream.conn.qLogger->addTransportStateUpdate( getFlowControlEvent(stream.flowControlState.peerAdvertisedMaxOffset)); @@ -286,6 +288,7 @@ void handleStreamWindowUpdate( PacketNum packetNum) { if (stream.flowControlState.peerAdvertisedMaxOffset <= maximumData) { stream.flowControlState.peerAdvertisedMaxOffset = maximumData; + stream.flowControlState.pendingBlockedFrame = false; if (stream.flowControlState.peerAdvertisedMaxOffset > stream.currentWriteOffset + stream.writeBuffer.chainLength() + stream.writeBufMeta.length) { diff --git a/quic/flowcontrol/test/QuicFlowControlTest.cpp b/quic/flowcontrol/test/QuicFlowControlTest.cpp index 77ef7a934..b71460cbd 100644 --- a/quic/flowcontrol/test/QuicFlowControlTest.cpp +++ b/quic/flowcontrol/test/QuicFlowControlTest.cpp @@ -347,6 +347,40 @@ TEST_F(QuicFlowControlTest, MaybeWriteBlockedAfterAPIWrite) { maybeWriteBlockAfterAPIWrite(stream); EXPECT_TRUE(conn_.streamManager->hasBlocked()); } +TEST_F(QuicFlowControlTest, UpdateFlowControlClearsStreamBlockedFlag) { + StreamId id = 3; + QuicStreamState stream(id, conn_); + stream.conn.transportSettings.useNewStreamBlockedCondition = true; + stream.currentWriteOffset = 400; + stream.flowControlState.peerAdvertisedMaxOffset = 400; + + // The flag is off initially. + EXPECT_FALSE(stream.flowControlState.pendingBlockedFrame); + + // Stream blocked. + EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(1); + maybeWriteBlockAfterSocketWrite(stream); + EXPECT_TRUE(conn_.streamManager->hasBlocked()); + EXPECT_TRUE(stream.flowControlState.pendingBlockedFrame); + + // Stream still blocked, a peding blocked frame flag. + EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(0); + maybeWriteBlockAfterSocketWrite(stream); + EXPECT_TRUE(conn_.streamManager->hasBlocked()); + EXPECT_TRUE(stream.flowControlState.pendingBlockedFrame); + + // A flow control update resets the pendingBlockedFrame flag + uint64_t newMaximumData = 800; + handleStreamWindowUpdate(stream, newMaximumData, 123 /* PacketNum */); + EXPECT_FALSE(stream.flowControlState.pendingBlockedFrame); + + // Stream blocked again. + stream.currentWriteOffset = 800; + EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(1); + maybeWriteBlockAfterSocketWrite(stream); + EXPECT_TRUE(conn_.streamManager->hasBlocked()); + EXPECT_TRUE(stream.flowControlState.pendingBlockedFrame); +} TEST_F(QuicFlowControlTest, MaybeWriteBlockedAfterSocketWrite) { StreamId id = 3; @@ -364,9 +398,10 @@ TEST_F(QuicFlowControlTest, MaybeWriteBlockedAfterSocketWrite) { maybeWriteBlockAfterSocketWrite(stream); EXPECT_TRUE(conn_.streamManager->hasBlocked()); - // Now write something + // Now write something - onStreamFlowControlBlocked() is not called because + // we've already issued a blocked stream frame. stream.writeBuffer.append(IOBuf::copyBuffer("1234")); - EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(1); + EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(0); maybeWriteBlockAfterSocketWrite(stream); EXPECT_TRUE(conn_.streamManager->hasBlocked()); diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 1142e990f..45f099faf 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -335,6 +335,11 @@ struct QuicStreamState : public QuicStreamLike { uint64_t peerAdvertisedMaxOffset{0}; // Time at which the last flow control update was sent by the transport. folly::Optional timeOfLastFlowControlUpdate; + // A flag indicating if the stream has a pending blocked frame to the peer + // (blocked frame sent, but a stream flow control update has not been + // received yet). Set when we write a blocked data frame on the stream; + // cleared when we receive a flow control update for the stream. + bool pendingBlockedFrame{false}; }; StreamFlowControlState flowControlState;