mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
Switch to chained memory data path if GSO is not supported
Summary: Check for GSO support once before the first write, cache the value and use it throughout the conn lifetime. If GSO is not supported, ensure we use chained memory write path. Reviewed By: mjoras Differential Revision: D40240513 fbshipit-source-id: b699b4633246f3c15d2be7b39580686e44c2aab3
This commit is contained in:
committed by
Facebook GitHub Bot
parent
15c110da72
commit
c6d9731247
@@ -516,17 +516,17 @@ void BatchWriterDeleter::operator()(BatchWriter* batchWriter) {
|
||||
|
||||
// BatchWriterFactory
|
||||
BatchWriterPtr BatchWriterFactory::makeBatchWriter(
|
||||
folly::AsyncUDPSocket& sock,
|
||||
const quic::QuicBatchingMode& batchingMode,
|
||||
uint32_t batchSize,
|
||||
bool useThreadLocal,
|
||||
const std::chrono::microseconds& threadLocalDelay,
|
||||
DataPathType dataPathType,
|
||||
QuicConnectionStateBase& conn) {
|
||||
QuicConnectionStateBase& conn,
|
||||
bool gsoSupported) {
|
||||
#if USE_THREAD_LOCAL_BATCH_WRITER
|
||||
if (useThreadLocal &&
|
||||
(batchingMode == quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO) &&
|
||||
sock.getGSO() >= 0) {
|
||||
gsoSupported) {
|
||||
BatchWriterPtr ret(
|
||||
ThreadLocalBatchWriterCache::getThreadLocalInstance().getCachedWriter(
|
||||
batchingMode, threadLocalDelay));
|
||||
@@ -545,7 +545,7 @@ BatchWriterPtr BatchWriterFactory::makeBatchWriter(
|
||||
case quic::QuicBatchingMode::BATCHING_MODE_NONE:
|
||||
return BatchWriterPtr(new SinglePacketBatchWriter());
|
||||
case quic::QuicBatchingMode::BATCHING_MODE_GSO: {
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported) {
|
||||
if (dataPathType == DataPathType::ChainedMemory) {
|
||||
return BatchWriterPtr(new GSOPacketBatchWriter(batchSize));
|
||||
}
|
||||
@@ -557,7 +557,7 @@ BatchWriterPtr BatchWriterFactory::makeBatchWriter(
|
||||
case quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG:
|
||||
return BatchWriterPtr(new SendmmsgPacketBatchWriter(batchSize));
|
||||
case quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO: {
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported) {
|
||||
return BatchWriterPtr(new SendmmsgGSOPacketBatchWriter(batchSize));
|
||||
}
|
||||
|
||||
|
@@ -253,13 +253,13 @@ using BatchWriterPtr = std::unique_ptr<BatchWriter, BatchWriterDeleter>;
|
||||
class BatchWriterFactory {
|
||||
public:
|
||||
static BatchWriterPtr makeBatchWriter(
|
||||
folly::AsyncUDPSocket& sock,
|
||||
const quic::QuicBatchingMode& batchingMode,
|
||||
uint32_t batchSize,
|
||||
bool useThreadLocal,
|
||||
const std::chrono::microseconds& threadLocalDelay,
|
||||
DataPathType dataPathType,
|
||||
QuicConnectionStateBase& conn);
|
||||
QuicConnectionStateBase& conn,
|
||||
bool gsoSupported);
|
||||
};
|
||||
|
||||
} // namespace quic
|
||||
|
@@ -1386,14 +1386,27 @@ WriteQuicDataResult writeConnectionDataToSocket(
|
||||
<< " writing data using scheduler=" << scheduler.name() << " "
|
||||
<< connection;
|
||||
|
||||
if (!connection.gsoSupported.hasValue()) {
|
||||
connection.gsoSupported = sock.getGSO() >= 0;
|
||||
if (!*connection.gsoSupported &&
|
||||
(connection.transportSettings.dataPathType ==
|
||||
DataPathType::ContinuousMemory)) {
|
||||
// Change data path type to DataPathType::ChainedMemory.
|
||||
// Continuous memory data path is only supported with working GSO.
|
||||
LOG(ERROR) << "Switching data path to ChainedMemory as "
|
||||
<< "GSO is not supported on the socket";
|
||||
connection.transportSettings.dataPathType = DataPathType::ChainedMemory;
|
||||
}
|
||||
}
|
||||
|
||||
auto batchWriter = BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
connection.transportSettings.batchingMode,
|
||||
connection.transportSettings.maxBatchSize,
|
||||
connection.transportSettings.useThreadLocalBatching,
|
||||
connection.transportSettings.threadLocalDelay,
|
||||
connection.transportSettings.dataPathType,
|
||||
connection);
|
||||
connection,
|
||||
*connection.gsoSupported);
|
||||
|
||||
auto happyEyeballsState = connection.nodeType == QuicNodeType::Server
|
||||
? nullptr
|
||||
|
@@ -30,6 +30,7 @@ struct QuicBatchWriterTest : public ::testing::Test,
|
||||
|
||||
protected:
|
||||
QuicServerConnectionState conn_;
|
||||
bool gsoSupported_{false};
|
||||
};
|
||||
|
||||
TEST_P(QuicBatchWriterTest, TestBatchingNone) {
|
||||
@@ -40,13 +41,13 @@ TEST_P(QuicBatchWriterTest, TestBatchingNone) {
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_NONE,
|
||||
kBatchNum,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest('A', kStrLen);
|
||||
|
||||
@@ -69,20 +70,21 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOBase) {
|
||||
folly::AsyncUDPSocket sock(&evb);
|
||||
sock.setReuseAddr(false);
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
gsoSupported_ = sock.getGSO() >= 0;
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
1,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest(kStrLen, 'A');
|
||||
// if GSO is not available, just test we've got a regular
|
||||
// batch writer
|
||||
if (sock.getGSO() < 0) {
|
||||
if (!gsoSupported_) {
|
||||
CHECK(batchWriter->empty());
|
||||
CHECK_EQ(batchWriter->size(), 0);
|
||||
auto buf = folly::IOBuf::copyBuffer(strTest);
|
||||
@@ -98,19 +100,20 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOLastSmallPacket) {
|
||||
folly::AsyncUDPSocket sock(&evb);
|
||||
sock.setReuseAddr(false);
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
gsoSupported_ = sock.getGSO() >= 0;
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
1,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest;
|
||||
// only if GSO is available
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported_) {
|
||||
// run multiple loops
|
||||
for (size_t i = 0; i < kNumLoops; i++) {
|
||||
// batch kStrLen, kStrLenLT
|
||||
@@ -139,19 +142,20 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOLastBigPacket) {
|
||||
folly::AsyncUDPSocket sock(&evb);
|
||||
sock.setReuseAddr(false);
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
gsoSupported_ = sock.getGSO() >= 0;
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
1,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest;
|
||||
// only if GSO is available
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported_) {
|
||||
// run multiple loops
|
||||
for (size_t i = 0; i < kNumLoops; i++) {
|
||||
// try to batch kStrLen, kStrLenGT
|
||||
@@ -175,20 +179,21 @@ TEST_P(QuicBatchWriterTest, TestBatchingGSOBatchNum) {
|
||||
folly::AsyncUDPSocket sock(&evb);
|
||||
sock.setReuseAddr(false);
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
gsoSupported_ = sock.getGSO() >= 0;
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
kBatchNum,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest(kStrLen, 'A');
|
||||
// if GSO is not available, just test we've got a regular
|
||||
// batch writer
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported_) {
|
||||
// run multiple loops
|
||||
for (size_t i = 0; i < kNumLoops; i++) {
|
||||
// try to batch up to kBatchNum
|
||||
@@ -222,13 +227,13 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsg) {
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG,
|
||||
kBatchNum,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest(kStrLen, 'A');
|
||||
|
||||
@@ -262,20 +267,21 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatchNum) {
|
||||
folly::AsyncUDPSocket sock(&evb);
|
||||
sock.setReuseAddr(false);
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
gsoSupported_ = sock.getGSO() >= 0;
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
||||
kBatchNum,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest(kStrLen, 'A');
|
||||
// if GSO is not available, just test we've got a regular
|
||||
// batch writer
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported_) {
|
||||
// run multiple loops
|
||||
for (size_t i = 0; i < kNumLoops; i++) {
|
||||
// try to batch up to kBatchNum
|
||||
@@ -307,20 +313,21 @@ TEST_P(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatcBigSmallPacket) {
|
||||
folly::AsyncUDPSocket sock(&evb);
|
||||
sock.setReuseAddr(false);
|
||||
sock.bind(folly::SocketAddress("127.0.0.1", 0));
|
||||
gsoSupported_ = sock.getGSO() >= 0;
|
||||
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO,
|
||||
3 * kBatchNum,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ChainedMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
std::string strTest(kStrLen, 'A');
|
||||
// if GSO is not available, just test we've got a regular
|
||||
// batch writer
|
||||
if (sock.getGSO() >= 0) {
|
||||
if (gsoSupported_) {
|
||||
// run multiple loops
|
||||
for (size_t i = 0; i < kNumLoops; i++) {
|
||||
// try to batch up to kBatchNum
|
||||
@@ -355,19 +362,19 @@ TEST_P(QuicBatchWriterTest, InplaceWriterNeedsFlush) {
|
||||
bool useThreadLocal = GetParam();
|
||||
folly::EventBase evb;
|
||||
folly::test::MockAsyncUDPSocket sock(&evb);
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
||||
|
||||
@@ -382,19 +389,19 @@ TEST_P(QuicBatchWriterTest, InplaceWriterAppendLimit) {
|
||||
bool useThreadLocal = GetParam();
|
||||
folly::EventBase evb;
|
||||
folly::test::MockAsyncUDPSocket sock(&evb);
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
||||
|
||||
@@ -417,19 +424,19 @@ TEST_P(QuicBatchWriterTest, InplaceWriterAppendSmaller) {
|
||||
bool useThreadLocal = GetParam();
|
||||
folly::EventBase evb;
|
||||
folly::test::MockAsyncUDPSocket sock(&evb);
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
EXPECT_FALSE(batchWriter->needsFlush(1000));
|
||||
|
||||
@@ -456,15 +463,15 @@ TEST_P(QuicBatchWriterTest, InplaceWriterWriteAll) {
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
ASSERT_FALSE(batchWriter->needsFlush(1000));
|
||||
|
||||
@@ -505,15 +512,15 @@ TEST_P(QuicBatchWriterTest, InplaceWriterWriteOne) {
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
CHECK(batchWriter);
|
||||
ASSERT_FALSE(batchWriter->needsFlush(1000));
|
||||
|
||||
@@ -545,15 +552,15 @@ TEST_P(QuicBatchWriterTest, InplaceWriterLastOneTooBig) {
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
for (size_t i = 0; i < 5; i++) {
|
||||
auto buf = bufAccessor->obtain();
|
||||
buf->append(700);
|
||||
@@ -587,7 +594,7 @@ TEST_P(QuicBatchWriterTest, InplaceWriterBufResidueCheck) {
|
||||
bool useThreadLocal = GetParam();
|
||||
folly::EventBase evb;
|
||||
folly::test::MockAsyncUDPSocket sock(&evb);
|
||||
EXPECT_CALL(sock, getGSO()).WillRepeatedly(Return(1));
|
||||
gsoSupported_ = true;
|
||||
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
@@ -595,13 +602,13 @@ TEST_P(QuicBatchWriterTest, InplaceWriterBufResidueCheck) {
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
conn_.udpSendPacketLen = 1000;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
sock,
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
batchSize,
|
||||
useThreadLocal,
|
||||
quic::kDefaultThreadLocalDelay,
|
||||
DataPathType::ContinuousMemory,
|
||||
conn_);
|
||||
conn_,
|
||||
gsoSupported_);
|
||||
auto buf = bufAccessor->obtain();
|
||||
folly::IOBuf* rawBuf = buf.get();
|
||||
bufAccessor->release(std::move(buf));
|
||||
|
@@ -749,6 +749,9 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
|
||||
|
||||
// Sequence number to use for the next ACK_FREQUENCY frame
|
||||
uint64_t nextAckFrequencyFrameSequenceNumber{0};
|
||||
|
||||
// GSO supported on conn.
|
||||
folly::Optional<bool> gsoSupported;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);
|
||||
|
Reference in New Issue
Block a user