1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Optional writable bytes backpressure

Summary: Add backpressure based on CCA's writable bytes (cwnd), which can be useful for the app to use as backpressure. This also takes into account stream bytes currently buffered.

Differential Revision: D52098629

fbshipit-source-id: c672e73bab01eb301a67c9b8a95225780000cbd6
This commit is contained in:
Matt Joras
2023-12-15 21:58:10 -08:00
committed by Facebook GitHub Bot
parent efef384c02
commit cc927374d7
3 changed files with 82 additions and 4 deletions

View File

@@ -2212,15 +2212,21 @@ uint64_t QuicTransportBase::maxWritableOnStream(
const QuicStreamState& stream) const { const QuicStreamState& stream) const {
auto connWritableBytes = maxWritableOnConn(); auto connWritableBytes = maxWritableOnConn();
auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream); auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream);
auto flowControlAllowedBytes = return std::min(streamFlowControlBytes, connWritableBytes);
std::min(streamFlowControlBytes, connWritableBytes);
return flowControlAllowedBytes;
} }
uint64_t QuicTransportBase::maxWritableOnConn() const { uint64_t QuicTransportBase::maxWritableOnConn() const {
auto connWritableBytes = getSendConnFlowControlBytesAPI(*conn_); auto connWritableBytes = getSendConnFlowControlBytesAPI(*conn_);
auto availableBufferSpace = bufferSpaceAvailable(); auto availableBufferSpace = bufferSpaceAvailable();
return std::min(connWritableBytes, availableBufferSpace); uint64_t ret = std::min(connWritableBytes, availableBufferSpace);
uint8_t multiplier = conn_->transportSettings.backpressureHeadroomFactor;
if (multiplier > 0) {
auto headRoom = multiplier * congestionControlWritableBytes(*conn_);
auto bufferLen = conn_->flowControlState.sumCurStreamBufferLen;
headRoom -= bufferLen > headRoom ? headRoom : bufferLen;
ret = std::min(ret, headRoom);
}
return ret;
} }
QuicSocket::WriteResult QuicTransportBase::writeChain( QuicSocket::WriteResult QuicTransportBase::writeChain(

View File

@@ -3929,6 +3929,27 @@ TEST_F(QuicTransportTest, NotifyPendingWriteConnImmediate) {
evb_.loop(); evb_.loop();
} }
TEST_F(QuicTransportTest, NotifyPendingWriteConnWritableBytesBacpressure) {
auto& conn = transport_->getConnectionState();
conn.transportSettings.backpressureHeadroomFactor = 1;
EXPECT_CALL(
writeCallback_, onConnectionWriteReady(10 * kDefaultUDPSendPacketLen));
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn.congestionController = std::move(mockCongestionController);
EXPECT_CALL(*rawCongestionController, getWritableBytes())
.WillRepeatedly(Return(10 * kDefaultUDPSendPacketLen));
transport_->notifyPendingWriteOnConnection(&writeCallback_);
evb_.loop();
Mock::VerifyAndClearExpectations(&writeCallback_);
EXPECT_CALL(
writeCallback_, onConnectionWriteReady(20 * kDefaultUDPSendPacketLen));
conn.transportSettings.backpressureHeadroomFactor = 2;
transport_->notifyPendingWriteOnConnection(&writeCallback_);
evb_.loop();
}
TEST_F(QuicTransportTest, NotifyPendingWriteStreamImmediate) { TEST_F(QuicTransportTest, NotifyPendingWriteStreamImmediate) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
EXPECT_CALL(writeCallback_, onStreamWriteReady(stream, _)); EXPECT_CALL(writeCallback_, onStreamWriteReady(stream, _));
@@ -4137,6 +4158,51 @@ TEST_F(QuicTransportTest, NotifyPendingWriteStreamAsyncConnBlocked) {
NetworkData(IOBuf::copyBuffer("fake data"), Clock::now())); NetworkData(IOBuf::copyBuffer("fake data"), Clock::now()));
} }
TEST_F(QuicTransportTest, NotifyPendingWriteStreamWritableBytesBackpressure) {
auto streamId = transport_->createBidirectionalStream().value();
auto& conn = transport_->getConnectionState();
conn.transportSettings.backpressureHeadroomFactor = 1;
auto stream = conn.streamManager->getStream(streamId);
// Artificially restrict the conn flow control to have no bytes remaining.
updateFlowControlOnWriteToStream(
*stream, conn.flowControlState.peerAdvertisedMaxOffset);
updateFlowControlOnWriteToSocket(
*stream, conn.flowControlState.peerAdvertisedMaxOffset);
EXPECT_CALL(writeCallback_, onStreamWriteReady(stream->id, _)).Times(0);
transport_->notifyPendingWriteOnStream(stream->id, &writeCallback_);
evb_.loop();
Mock::VerifyAndClearExpectations(&writeCallback_);
transport_->onNetworkData(
SocketAddress("::1", 10000),
NetworkData(IOBuf::copyBuffer("fake data"), Clock::now()));
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn.congestionController = std::move(mockCongestionController);
EXPECT_CALL(*rawCongestionController, getWritableBytes())
.WillRepeatedly(Return(10 * kDefaultUDPSendPacketLen));
EXPECT_CALL(
writeCallback_,
onStreamWriteReady(stream->id, 10 * kDefaultUDPSendPacketLen));
PacketNum num = 10;
// Give the conn some headroom.
handleConnWindowUpdate(
conn,
MaxDataFrame(
conn.flowControlState.peerAdvertisedMaxOffset +
1000 * kDefaultUDPSendPacketLen),
num);
transport_->onNetworkData(
SocketAddress("::1", 10000),
NetworkData(IOBuf::copyBuffer("fake data"), Clock::now()));
}
TEST_F(QuicTransportTest, NotifyPendingWriteStreamAsyncStreamBlocked) { TEST_F(QuicTransportTest, NotifyPendingWriteStreamAsyncStreamBlocked) {
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();

View File

@@ -326,6 +326,12 @@ struct TransportSettings {
uint64_t maxReceiveTimestampsPerAckStored{kMaxReceivedPktsTimestampsStored}; uint64_t maxReceiveTimestampsPerAckStored{kMaxReceivedPktsTimestampsStored};
// Close the connection completely if a migration occurs during the handshake. // Close the connection completely if a migration occurs during the handshake.
bool closeIfMigrationDuringHandshake{true}; bool closeIfMigrationDuringHandshake{true};
// Whether to use writable bytes to apply app backpressure via the callbacks
// for the max writable on stream or connection. The value is a multiplier
// for the writable bytes given in the callback, which may be useful for
// allowing cwnd growth. 0 disables. The amount given to callbacks has the
// current amount of stream bytes buffered subtracted from it.
uint8_t backpressureHeadroomFactor{0};
}; };
} // namespace quic } // namespace quic