diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 775442a69..5ec1af3ba 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -2212,15 +2212,21 @@ uint64_t QuicTransportBase::maxWritableOnStream( const QuicStreamState& stream) const { auto connWritableBytes = maxWritableOnConn(); auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream); - auto flowControlAllowedBytes = - std::min(streamFlowControlBytes, connWritableBytes); - return flowControlAllowedBytes; + return std::min(streamFlowControlBytes, connWritableBytes); } uint64_t QuicTransportBase::maxWritableOnConn() const { auto connWritableBytes = getSendConnFlowControlBytesAPI(*conn_); auto availableBufferSpace = bufferSpaceAvailable(); - return std::min(connWritableBytes, availableBufferSpace); + uint64_t ret = std::min(connWritableBytes, availableBufferSpace); + uint8_t multiplier = conn_->transportSettings.backpressureHeadroomFactor; + if (multiplier > 0) { + auto headRoom = multiplier * congestionControlWritableBytes(*conn_); + auto bufferLen = conn_->flowControlState.sumCurStreamBufferLen; + headRoom -= bufferLen > headRoom ? headRoom : bufferLen; + ret = std::min(ret, headRoom); + } + return ret; } QuicSocket::WriteResult QuicTransportBase::writeChain( diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 879ada827..7325d9223 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -3929,6 +3929,27 @@ TEST_F(QuicTransportTest, NotifyPendingWriteConnImmediate) { evb_.loop(); } +TEST_F(QuicTransportTest, NotifyPendingWriteConnWritableBytesBacpressure) { + auto& conn = transport_->getConnectionState(); + conn.transportSettings.backpressureHeadroomFactor = 1; + EXPECT_CALL( + writeCallback_, onConnectionWriteReady(10 * kDefaultUDPSendPacketLen)); + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + conn.congestionController = std::move(mockCongestionController); + EXPECT_CALL(*rawCongestionController, getWritableBytes()) + .WillRepeatedly(Return(10 * kDefaultUDPSendPacketLen)); + transport_->notifyPendingWriteOnConnection(&writeCallback_); + evb_.loop(); + Mock::VerifyAndClearExpectations(&writeCallback_); + EXPECT_CALL( + writeCallback_, onConnectionWriteReady(20 * kDefaultUDPSendPacketLen)); + conn.transportSettings.backpressureHeadroomFactor = 2; + transport_->notifyPendingWriteOnConnection(&writeCallback_); + evb_.loop(); +} + TEST_F(QuicTransportTest, NotifyPendingWriteStreamImmediate) { auto stream = transport_->createBidirectionalStream().value(); EXPECT_CALL(writeCallback_, onStreamWriteReady(stream, _)); @@ -4137,6 +4158,51 @@ TEST_F(QuicTransportTest, NotifyPendingWriteStreamAsyncConnBlocked) { NetworkData(IOBuf::copyBuffer("fake data"), Clock::now())); } +TEST_F(QuicTransportTest, NotifyPendingWriteStreamWritableBytesBackpressure) { + auto streamId = transport_->createBidirectionalStream().value(); + auto& conn = transport_->getConnectionState(); + conn.transportSettings.backpressureHeadroomFactor = 1; + + auto stream = conn.streamManager->getStream(streamId); + // Artificially restrict the conn flow control to have no bytes remaining. + updateFlowControlOnWriteToStream( + *stream, conn.flowControlState.peerAdvertisedMaxOffset); + updateFlowControlOnWriteToSocket( + *stream, conn.flowControlState.peerAdvertisedMaxOffset); + + EXPECT_CALL(writeCallback_, onStreamWriteReady(stream->id, _)).Times(0); + transport_->notifyPendingWriteOnStream(stream->id, &writeCallback_); + evb_.loop(); + Mock::VerifyAndClearExpectations(&writeCallback_); + + transport_->onNetworkData( + SocketAddress("::1", 10000), + NetworkData(IOBuf::copyBuffer("fake data"), Clock::now())); + + auto mockCongestionController = + std::make_unique>(); + auto rawCongestionController = mockCongestionController.get(); + conn.congestionController = std::move(mockCongestionController); + EXPECT_CALL(*rawCongestionController, getWritableBytes()) + .WillRepeatedly(Return(10 * kDefaultUDPSendPacketLen)); + + EXPECT_CALL( + writeCallback_, + onStreamWriteReady(stream->id, 10 * kDefaultUDPSendPacketLen)); + + PacketNum num = 10; + // Give the conn some headroom. + handleConnWindowUpdate( + conn, + MaxDataFrame( + conn.flowControlState.peerAdvertisedMaxOffset + + 1000 * kDefaultUDPSendPacketLen), + num); + transport_->onNetworkData( + SocketAddress("::1", 10000), + NetworkData(IOBuf::copyBuffer("fake data"), Clock::now())); +} + TEST_F(QuicTransportTest, NotifyPendingWriteStreamAsyncStreamBlocked) { auto streamId = transport_->createBidirectionalStream().value(); auto& conn = transport_->getConnectionState(); diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index 117577888..502cfce9c 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -326,6 +326,12 @@ struct TransportSettings { uint64_t maxReceiveTimestampsPerAckStored{kMaxReceivedPktsTimestampsStored}; // Close the connection completely if a migration occurs during the handshake. bool closeIfMigrationDuringHandshake{true}; + // Whether to use writable bytes to apply app backpressure via the callbacks + // for the max writable on stream or connection. The value is a multiplier + // for the writable bytes given in the callback, which may be useful for + // allowing cwnd growth. 0 disables. The amount given to callbacks has the + // current amount of stream bytes buffered subtracted from it. + uint8_t backpressureHeadroomFactor{0}; }; } // namespace quic