diff --git a/quic/QuicConstants.cpp b/quic/QuicConstants.cpp index d0f6d56ff..8ba6edec5 100644 --- a/quic/QuicConstants.cpp +++ b/quic/QuicConstants.cpp @@ -125,6 +125,8 @@ std::string_view writeDataReasonString(WriteDataReason reason) { return "Datagram"; case WriteDataReason::NO_WRITE: return "NoWrite"; + case WriteDataReason::BUFFERED_WRITE: + return "BufferedWrite"; } folly::assume_unreachable(); } diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index 9962adb1b..b3b4b02b8 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -677,6 +677,7 @@ enum class WriteDataReason { PATHCHALLENGE, PING, DATAGRAM, + BUFFERED_WRITE, }; enum class NoWriteReason { diff --git a/quic/api/QuicBatchWriter.cpp b/quic/api/QuicBatchWriter.cpp index 09d2f543c..91336853d 100644 --- a/quic/api/QuicBatchWriter.cpp +++ b/quic/api/QuicBatchWriter.cpp @@ -86,6 +86,50 @@ bool SinglePacketInplaceBatchWriter::empty() const { return buf->length() == 0; } +// SinglePacketBackpressureBatchWriter +SinglePacketBackpressureBatchWriter::SinglePacketBackpressureBatchWriter( + QuicConnectionStateBase& conn) + : conn_(conn) { + // If we have a write to retry from a previous attempt, pick that up. + if (conn_.pendingWriteBatch_.buf) { + buf_.swap(conn_.pendingWriteBatch_.buf); + lastWriteSuccessful_ = false; + } +} + +SinglePacketBackpressureBatchWriter::~SinglePacketBackpressureBatchWriter() { + if (buf_ && !buf_->empty()) { + conn_.pendingWriteBatch_.buf.swap(buf_); + } +} + +void SinglePacketBackpressureBatchWriter::reset() { + // Only clear the buffer if it's been written successfully. + // Otherwise, retain it so it can be retried. + if (lastWriteSuccessful_) { + buf_.reset(nullptr); + } +} + +bool SinglePacketBackpressureBatchWriter::append( + std::unique_ptr&& buf, + size_t /* unused */, + const folly::SocketAddress& /*unused*/, + QuicAsyncUDPSocket* /*unused*/) { + buf_ = std::move(buf); + + // needs to be flushed + return true; +} + +ssize_t SinglePacketBackpressureBatchWriter::write( + QuicAsyncUDPSocket& sock, + const folly::SocketAddress& address) { + auto written = sock.write(address, buf_); + lastWriteSuccessful_ = written > 0; + return written; +} + // SendmmsgPacketBatchWriter SendmmsgPacketBatchWriter::SendmmsgPacketBatchWriter(size_t maxBufs) : maxBufs_(maxBufs) { diff --git a/quic/api/QuicBatchWriter.h b/quic/api/QuicBatchWriter.h index 150958e6f..6b763ca95 100644 --- a/quic/api/QuicBatchWriter.h +++ b/quic/api/QuicBatchWriter.h @@ -122,6 +122,26 @@ class SinglePacketInplaceBatchWriter : public IOBufBatchWriter { QuicConnectionStateBase& conn_; }; +class SinglePacketBackpressureBatchWriter : public IOBufBatchWriter { + public: + explicit SinglePacketBackpressureBatchWriter(QuicConnectionStateBase& conn); + ~SinglePacketBackpressureBatchWriter() override; + + void reset() override; + bool append( + std::unique_ptr&& buf, + size_t size, + const folly::SocketAddress& /*unused*/, + QuicAsyncUDPSocket* /*unused*/) override; + ssize_t write(QuicAsyncUDPSocket& sock, const folly::SocketAddress& address) + override; + + private: + QuicConnectionStateBase& conn_; + // whether the last write attempt was successful. + bool lastWriteSuccessful_{true}; +}; + class SendmmsgPacketBatchWriter : public BatchWriter { public: explicit SendmmsgPacketBatchWriter(size_t maxBufs); diff --git a/quic/api/QuicBatchWriterFactory.cpp b/quic/api/QuicBatchWriterFactory.cpp index 89aff9bec..6e9d9b1fa 100644 --- a/quic/api/QuicBatchWriterFactory.cpp +++ b/quic/api/QuicBatchWriterFactory.cpp @@ -32,11 +32,17 @@ BatchWriterPtr makeSendmmsgGsoBatchWriter(uint32_t batchSize) { BatchWriterPtr BatchWriterFactory::makeBatchWriter( const quic::QuicBatchingMode& batchingMode, uint32_t batchSize, + bool enableBackpressure, DataPathType dataPathType, QuicConnectionStateBase& conn, bool gsoSupported) { return makeBatchWriterHelper( - batchingMode, batchSize, dataPathType, conn, gsoSupported); + batchingMode, + batchSize, + enableBackpressure, + dataPathType, + conn, + gsoSupported); } } // namespace quic diff --git a/quic/api/QuicBatchWriterFactory.h b/quic/api/QuicBatchWriterFactory.h index 90e77b038..f7c3e4579 100644 --- a/quic/api/QuicBatchWriterFactory.h +++ b/quic/api/QuicBatchWriterFactory.h @@ -23,6 +23,7 @@ class BatchWriterFactory { static BatchWriterPtr makeBatchWriter( const quic::QuicBatchingMode& batchingMode, uint32_t batchSize, + bool enableBackpressure, DataPathType dataPathType, QuicConnectionStateBase& conn, bool gsoSupported); @@ -31,12 +32,16 @@ class BatchWriterFactory { static BatchWriterPtr makeBatchWriterHelper( const quic::QuicBatchingMode& batchingMode, uint32_t batchSize, + bool enableBackpressure, DataPathType dataPathType, QuicConnectionStateBase& conn, bool gsoSupported) { switch (batchingMode) { case quic::QuicBatchingMode::BATCHING_MODE_NONE: - if (useSinglePacketInplaceBatchWriter(batchSize, dataPathType)) { + if (enableBackpressure && dataPathType == DataPathType::ChainedMemory && + conn.transportSettings.useSockWritableEvents) { + return BatchWriterPtr(new SinglePacketBackpressureBatchWriter(conn)); + } else if (useSinglePacketInplaceBatchWriter(batchSize, dataPathType)) { return BatchWriterPtr(new SinglePacketInplaceBatchWriter(conn)); } return BatchWriterPtr(new SinglePacketBatchWriter()); diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 272034606..ad28dc698 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -3591,11 +3591,14 @@ void QuicTransportBase::maybeStopWriteLooperAndArmSocketWritableEvent() { !socket_->isWritableCallbackSet()) { // Check if all data has been written and we're not limited by flow // control/congestion control. - bool haveDataToWrite = shouldWriteData(*conn_) != WriteDataReason::NO_WRITE; + auto writeReason = shouldWriteData(*conn_); + bool haveBufferToRetry = writeReason == WriteDataReason::BUFFERED_WRITE; + bool haveNewDataToWrite = + (writeReason != WriteDataReason::NO_WRITE) && !haveBufferToRetry; bool connHasWriteWindow = (conn_->congestionController->getWritableBytes() > 0) && (getSendConnFlowControlBytesAPI(*conn_) > 0); - if (haveDataToWrite && connHasWriteWindow) { + if (haveBufferToRetry || (haveNewDataToWrite && connHasWriteWindow)) { // Re-arm the write event and stop the write // looper. socket_->resumeWrite(this); diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index e708f5d4c..bc32641ba 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -1460,7 +1460,12 @@ WriteQuicDataResult writeConnectionDataToSocket( connection.writeDebugState.noWriteReason = NoWriteReason::WRITE_OK; } - if (!scheduler.hasData()) { + // Note: if a write is pending, it will be taken over by the batch writer when + // it's created. So this check has to be done before creating the batch + // writer. + bool pendingBufferedWrite = hasBufferedDataToWrite(connection); + + if (!scheduler.hasData() && !pendingBufferedWrite) { if (connection.loopDetectorCallback) { connection.writeDebugState.noWriteReason = NoWriteReason::EMPTY_SCHEDULER; } @@ -1492,6 +1497,7 @@ WriteQuicDataResult writeConnectionDataToSocket( auto batchWriter = BatchWriterFactory::makeBatchWriter( connection.transportSettings.batchingMode, connection.transportSettings.maxBatchSize, + connection.transportSettings.enableWriterBackpressure, connection.transportSettings.dataPathType, connection, *connection.gsoSupported); @@ -1506,6 +1512,16 @@ WriteQuicDataResult writeConnectionDataToSocket( connection.statsCallback, happyEyeballsState); + // If we have a pending write to retry. Flush that first and make sure it + // succeeds before scheduling any new data. + if (pendingBufferedWrite) { + if (!ioBufBatch.flush()) { + // Could not flush retried data. Return empty write result and wait for + // next retry. + return {0, 0, 0}; + } + } + auto batchSize = connection.transportSettings.batchingMode == QuicBatchingMode::BATCHING_MODE_NONE ? connection.transportSettings.writeConnectionDataPacketsLimit @@ -1734,6 +1750,11 @@ WriteDataReason shouldWriteData(/*const*/ QuicConnectionStateBase& conn) { QUIC_STATS(conn.statsCallback, onCwndBlocked); return WriteDataReason::NO_WRITE; } + + if (hasBufferedDataToWrite(conn)) { + return WriteDataReason::BUFFERED_WRITE; + } + return hasNonAckDataToWrite(conn); } @@ -1756,6 +1777,10 @@ bool hasAckDataToWrite(const QuicConnectionStateBase& conn) { return writeAcks; } +bool hasBufferedDataToWrite(const QuicConnectionStateBase& conn) { + return (bool)conn.pendingWriteBatch_.buf; +} + WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) { if (cryptoHasWritableData(conn)) { VLOG(10) << nodeToString(conn.nodeType) diff --git a/quic/api/QuicTransportFunctions.h b/quic/api/QuicTransportFunctions.h index 714c2c35b..b744bdc85 100644 --- a/quic/api/QuicTransportFunctions.h +++ b/quic/api/QuicTransportFunctions.h @@ -154,6 +154,7 @@ uint64_t writeZeroRttDataToSocket( WriteDataReason shouldWriteData(QuicConnectionStateBase& conn); bool hasAckDataToWrite(const QuicConnectionStateBase& conn); WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn); +bool hasBufferedDataToWrite(const QuicConnectionStateBase& conn); /** * Invoked when the written stream data was new stream data. diff --git a/quic/api/test/QuicBatchWriterTest.cpp b/quic/api/test/QuicBatchWriterTest.cpp index 1830956ec..2cfc4d66a 100644 --- a/quic/api/test/QuicBatchWriterTest.cpp +++ b/quic/api/test/QuicBatchWriterTest.cpp @@ -39,6 +39,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingNone) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_NONE, kBatchNum, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -70,6 +71,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingGSOBase) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, 1, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -99,6 +101,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingGSOLastSmallPacket) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, 1, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -140,6 +143,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingGSOLastBigPacket) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, 1, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -176,6 +180,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingGSOBatchNum) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, kBatchNum, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -213,6 +218,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingSendmmsg) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG, kBatchNum, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -255,6 +261,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatchNum) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO, kBatchNum, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -300,6 +307,7 @@ TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatcBigSmallPacket) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO, 3 * kBatchNum, + false, /* enable backpressure */ DataPathType::ChainedMemory, conn_, gsoSupported_); @@ -347,6 +355,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterNeedsFlush) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -369,6 +378,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterAppendLimit) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -399,6 +409,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterAppendSmaller) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -433,6 +444,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterWriteAll) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -481,6 +493,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterWriteOne) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -521,6 +534,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterLastOneTooBig) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -568,6 +582,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterBufResidueCheck) { auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( quic::QuicBatchingMode::BATCHING_MODE_GSO, batchSize, + false, /* enable backpressure */ DataPathType::ContinuousMemory, conn_, gsoSupported_); @@ -614,6 +629,7 @@ class SinglePacketInplaceBatchWriterTest : public ::testing::Test { return quic::BatchWriterFactory::makeBatchWriter( batchingMode, conn_.transportSettings.maxBatchSize, + conn_.transportSettings.enableWriterBackpressure, conn_.transportSettings.dataPathType, conn_, false /* gsoSupported_ */); @@ -747,5 +763,104 @@ TEST_F(SinglePacketInplaceBatchWriterTest, TestWrite) { EXPECT_TRUE(batchWriter->empty()); } +struct SinglePacketBackpressureBatchWriterTest : public ::testing::Test { + SinglePacketBackpressureBatchWriterTest() + : conn_(FizzServerQuicHandshakeContext::Builder().build()), + qEvb_(std::make_shared(&evb_)), + sock_(qEvb_) { + conn_.transportSettings.dataPathType = DataPathType::ChainedMemory; + conn_.transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_NONE; + conn_.transportSettings.maxBatchSize = 1; + conn_.transportSettings.enableWriterBackpressure = true; + conn_.transportSettings.useSockWritableEvents = true; + } + + BatchWriterPtr makeBatchWriter() { + return quic::BatchWriterFactory::makeBatchWriter( + conn_.transportSettings.batchingMode, + conn_.transportSettings.maxBatchSize, + conn_.transportSettings.enableWriterBackpressure, + conn_.transportSettings.dataPathType, + conn_, + false /* gsoSupported */); + } + + protected: + QuicServerConnectionState conn_; + folly::EventBase evb_; + std::shared_ptr qEvb_; + quic::test::MockAsyncUDPSocket sock_; +}; + +TEST_F(SinglePacketBackpressureBatchWriterTest, TestAppendRequestsFlush) { + auto batchWriter = makeBatchWriter(); + CHECK(batchWriter); + CHECK(dynamic_cast( + batchWriter.get())); + EXPECT_TRUE(batchWriter->empty()); + + auto buf = folly::IOBuf::copyBuffer("append attempt"); + EXPECT_TRUE(batchWriter->append( + std::move(buf), + buf->computeChainDataLength(), + folly::SocketAddress(), + &sock_)); +} + +TEST_F(SinglePacketBackpressureBatchWriterTest, TestFailedWriteCachedOnEAGAIN) { + auto batchWriter = makeBatchWriter(); + CHECK(batchWriter); + CHECK(dynamic_cast( + batchWriter.get())); + EXPECT_TRUE(batchWriter->empty()); + + std::string testString = "append attempt"; + auto buf = folly::IOBuf::copyBuffer(testString); + + EXPECT_TRUE(batchWriter->append( + std::move(buf), + buf->computeChainDataLength(), + folly::SocketAddress(), + &sock_)); + + EXPECT_CALL(sock_, write(_, _)) + .Times(1) + .WillOnce(Invoke([&](const auto& /* addr */, + const std::unique_ptr& /*buf*/) { + errno = EAGAIN; + return 0; + })); + // The write fails + EXPECT_EQ(batchWriter->write(sock_, folly::SocketAddress()), 0); + + // Resetting does not clear the cached buffer from the writer but the buffer + // is not yet cached in the transport. + batchWriter->reset(); + EXPECT_FALSE(conn_.pendingWriteBatch_.buf); + + // Destroying the writer caches the buffer in the transport. + batchWriter = nullptr; + EXPECT_TRUE(conn_.pendingWriteBatch_.buf); + + // A new batch writer picks up the cached buffer from the transport + batchWriter = makeBatchWriter(); + EXPECT_FALSE(conn_.pendingWriteBatch_.buf); + + // The write succeeds + EXPECT_CALL(sock_, write(_, _)) + .Times(1) + .WillOnce(Invoke([&](const auto& /* addr */, + const std::unique_ptr& buf) { + return buf->computeChainDataLength(); + })); + EXPECT_EQ( + batchWriter->write(sock_, folly::SocketAddress()), testString.size()); + + // Nothing is cached in the transport after the writer is reset and destroyed. + batchWriter->reset(); + batchWriter = nullptr; + EXPECT_FALSE(conn_.pendingWriteBatch_.buf); +} + } // namespace testing } // namespace quic diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 48cf255b4..12e4db795 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -334,6 +334,11 @@ class TestQuicTransport conn_->transportSettings.writeConnectionDataPacketsLimit); } + // This is to expose the protected pacedWriteDataToSocket() function + void pacedWriteDataToSocketThroughTransportBase() { + pacedWriteDataToSocket(); + } + bool hasWriteCipher() const { return conn_->oneRttWriteCipher != nullptr; } @@ -4825,5 +4830,81 @@ TEST_P(QuicTransportImplTestBase, TestOnSocketWritable) { transport.reset(); } +TEST_P( + QuicTransportImplTestBase, + TestBackpressureWriterArmsSocketWritableEvent) { + transport->setServerConnectionId(); + auto transportSettings = transport->getTransportSettings(); + + transportSettings.useSockWritableEvents = true; + transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_NONE; + transportSettings.maxBatchSize = 1; + transportSettings.dataPathType = DataPathType::ChainedMemory; + transportSettings.enableWriterBackpressure = true; + + transport->setTransportSettings(transportSettings); + transport->getConnectionState().streamManager->refreshTransportSettings( + transportSettings); + + transport->transportConn->oneRttWriteCipher = test::createNoOpAead(); + + // Create a stream with outgoing data. + auto streamId = transport->createBidirectionalStream().value(); + const auto& conn = transport->transportConn; + auto stream = transport->getStream(streamId); + std::string testString = "hello"; + stream->writeBuffer.append(IOBuf::copyBuffer(testString)); + conn->flowControlState.sumCurStreamBufferLen = testString.length(); + + // Insert streamId into the list. + conn->streamManager->addWritable(*stream); + conn->streamManager->updateWritableStreams(*stream); + + // Mock arming the write callback + bool writeCallbackArmed = false; + EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillRepeatedly(Invoke([&]() { + return writeCallbackArmed; + })); + EXPECT_CALL(*socketPtr, resumeWrite(_)) + .WillOnce(Invoke([&](QuicAsyncUDPSocket::WriteCallback*) { + writeCallbackArmed = true; + return folly::makeExpected(folly::Unit()); + })); + + // Fail the first write loop. + EXPECT_CALL(*socketPtr, write(_, _)) + .Times(2) // We attempt to flush the batch twice inside the write loop. + // Fail both. + .WillRepeatedly(Invoke([&](const auto& /* addr */, + const std::unique_ptr& /*buf*/) { + errno = EAGAIN; + return 0; + })); + + transport->writeLooper()->run(true /* thisIteration */); + EXPECT_TRUE(transport->writeLooper()->isRunning()); + + // A write attempt will cache the failed write, stop the write looper, and arm + // the write callback. + transport->pacedWriteDataToSocketThroughTransportBase(); + + // The transport has cached the failed write buffer. + EXPECT_TRUE(conn->pendingWriteBatch_.buf); + // Write looper stopped. + EXPECT_FALSE(transport->writeLooper()->isRunning()); + // Write callback armed. + EXPECT_TRUE(writeCallbackArmed); + + // Reset will make one write attempt. We don't care what happens to it + EXPECT_CALL(*socketPtr, write(_, _)) + .Times(1) + .WillRepeatedly(Invoke([&](const auto& /* addr */, + const std::unique_ptr& buf) { + errno = 0; + return buf->computeChainDataLength(); + })); + transport.reset(); +} + } // namespace test } // namespace quic diff --git a/quic/state/StateData.h b/quic/state/StateData.h index eae80ed59..d391856ea 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -526,6 +526,15 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction { // Current state of flow control. ConnectionFlowControlState flowControlState; + struct PendingWriteBatch { + std::unique_ptr buf; + // More fields will be needed here for other batch writer types. + }; + + // A write batch that was attempted but did not succeed. + // This is only used by the SinglePacketBackpressureBatchWriter. + PendingWriteBatch pendingWriteBatch_; + // The outstanding path challenge folly::Optional outstandingPathValidation; diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index f4755032c..73d7f2060 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -379,6 +379,10 @@ struct TransportSettings { uint64_t cwndModerateJumpstart{48000}; uint64_t cwndStrongJumpstart{72000}; bool useSockWritableEvents{false}; + // use backpressure single packet batch writer. Only works for + // QuicBatchingMode::BATCHING_MODE_NONE and DataPathType::ChainedMemory + // and requires useSockWritableEvents to be enabled. + bool enableWriterBackpressure{false}; }; } // namespace quic