mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-07-30 14:43:05 +03:00
Less direct Buf access in BufAccessor
Summary: **Context** The `BufAccessor` is used to access a contiguous section of memory. Right now, it works with a `Buf` under the hood. **Overall plan** The plan is to change the `BufAccessor` to use a `uint8_t*` instead. Since we're using section of contiguous memory, there's no need to use a chained buffer abstraction here. This'll move us closer to deprecating the usage `folly::IOBuf`. **What this diff is doing** Most use cases of the `BufAccessor` look like the following: ``` auto buf = bufAccessor.obtain(); // Do something with buf, like calling trimEnd bufAccessor.release(buf) ``` I'm adding APIs to the `BufAccessor` so that there's no need to `obtain()` and `release()` the `Buf`. We'd instead just call an API on the `BufAccessor`, which would call that same API on the underlying `folly::IOBuf`. Later on, we'll change the `BufAccessor` to use a `uint8_t*` under the hood. I'm currently leaving in the `obtain()`, `release()`, and `buf()` APIs because Fizz and the AsyncUDPSocket expect `folly::IOBuf` as inputs in many of their APIs. Once those callsites are migrated off `folly::IOBuf`, we can remove these APIs. Reviewed By: mjoras Differential Revision: D60973166 fbshipit-source-id: 52aa3541d0c4878c7ee8525d70ac280508b61e24
This commit is contained in:
committed by
Facebook GitHub Bot
parent
9216eed3d8
commit
a84708be4b
@ -37,9 +37,7 @@ ssize_t SinglePacketBatchWriter::write(
|
||||
|
||||
// SinglePacketInplaceBatchWriter
|
||||
void SinglePacketInplaceBatchWriter::reset() {
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
buf->clear();
|
||||
conn_.bufAccessor->clear();
|
||||
}
|
||||
|
||||
bool SinglePacketInplaceBatchWriter::append(
|
||||
@ -54,18 +52,15 @@ bool SinglePacketInplaceBatchWriter::append(
|
||||
ssize_t SinglePacketInplaceBatchWriter::write(
|
||||
QuicAsyncUDPSocket& sock,
|
||||
const folly::SocketAddress& address) {
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
CHECK(!buf->isChained());
|
||||
auto& buf = conn_.bufAccessor->buf();
|
||||
CHECK(!conn_.bufAccessor->isChained());
|
||||
auto ret = sock.write(address, buf);
|
||||
buf->clear();
|
||||
conn_.bufAccessor->clear();
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool SinglePacketInplaceBatchWriter::empty() const {
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
return buf->length() == 0;
|
||||
return conn_.bufAccessor->length() == 0;
|
||||
}
|
||||
|
||||
// SinglePacketBackpressureBatchWriter
|
||||
|
@ -6,6 +6,7 @@
|
||||
*/
|
||||
|
||||
#include <quic/api/QuicGsoBatchWriters.h>
|
||||
#include <quic/common/BufAccessor.h>
|
||||
#include <quic/common/udpsocket/QuicAsyncUDPSocket.h>
|
||||
|
||||
namespace {
|
||||
@ -106,8 +107,7 @@ bool GSOInplacePacketBatchWriter::append(
|
||||
const folly::SocketAddress& /* addr */,
|
||||
QuicAsyncUDPSocket* /* sock */) {
|
||||
CHECK(!needsFlush(size));
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
auto& buf = conn_.bufAccessor->buf();
|
||||
if (!lastPacketEnd_) {
|
||||
CHECK(prevSize_ == 0 && numPackets_ == 0);
|
||||
prevSize_ = size;
|
||||
@ -134,9 +134,8 @@ bool GSOInplacePacketBatchWriter::append(
|
||||
ssize_t GSOInplacePacketBatchWriter::write(
|
||||
QuicAsyncUDPSocket& sock,
|
||||
const folly::SocketAddress& address) {
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
CHECK(lastPacketEnd_);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
auto& buf = conn_.bufAccessor->buf();
|
||||
CHECK(!buf->isChained());
|
||||
CHECK(lastPacketEnd_ >= buf->data() && lastPacketEnd_ <= buf->tail())
|
||||
<< "lastPacketEnd_=" << (uintptr_t)lastPacketEnd_
|
||||
@ -198,11 +197,11 @@ size_t GSOInplacePacketBatchWriter::size() const {
|
||||
if (empty()) {
|
||||
return 0;
|
||||
}
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
CHECK(lastPacketEnd_);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
CHECK(lastPacketEnd_ >= buf->data() && lastPacketEnd_ <= buf->tail());
|
||||
size_t ret = lastPacketEnd_ - buf->data();
|
||||
CHECK(
|
||||
lastPacketEnd_ >= conn_.bufAccessor->data() &&
|
||||
lastPacketEnd_ <= conn_.bufAccessor->tail());
|
||||
size_t ret = lastPacketEnd_ - conn_.bufAccessor->data();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include <quic/QuicConstants.h>
|
||||
#include <quic/api/QuicPacketScheduler.h>
|
||||
#include <quic/common/BufAccessor.h>
|
||||
#include <quic/flowcontrol/QuicFlowController.h>
|
||||
#include <cstdint>
|
||||
|
||||
@ -902,8 +903,7 @@ SchedulingResult CloningScheduler::scheduleFramesForPacket(
|
||||
size_t prevSize = 0;
|
||||
if (conn_.transportSettings.dataPathType ==
|
||||
DataPathType::ContinuousMemory) {
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
prevSize = scopedBufAccessor.buf()->length();
|
||||
prevSize = conn_.bufAccessor->length();
|
||||
}
|
||||
// Reusing the same builder throughout loop bodies will lead to frames
|
||||
// belong to different original packets being written into the same clone
|
||||
@ -968,9 +968,7 @@ SchedulingResult CloningScheduler::scheduleFramesForPacket(
|
||||
// when peer fail to parse them.
|
||||
internalBuilder.reset();
|
||||
CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
|
||||
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
||||
auto& buf = scopedBufAccessor.buf();
|
||||
buf->trimEnd(buf->length() - prevSize);
|
||||
conn_.bufAccessor->trimEnd(conn_.bufAccessor->length() - prevSize);
|
||||
}
|
||||
}
|
||||
return SchedulingResult(none, none, 0);
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <quic/codec/QuicPacketBuilder.h>
|
||||
#include <quic/codec/QuicWriteCodec.h>
|
||||
#include <quic/codec/Types.h>
|
||||
#include <quic/common/BufAccessor.h>
|
||||
#include <quic/flowcontrol/QuicFlowController.h>
|
||||
#include <quic/happyeyeballs/QuicHappyEyeballsFunctions.h>
|
||||
|
||||
@ -231,14 +232,11 @@ DataPathResult continuousMemoryBuildScheduleEncrypt(
|
||||
IOBufQuicBatch& ioBufBatch,
|
||||
const Aead& aead,
|
||||
const PacketNumberCipher& headerCipher) {
|
||||
auto buf = connection.bufAccessor->obtain();
|
||||
auto prevSize = buf->length();
|
||||
connection.bufAccessor->release(std::move(buf));
|
||||
auto prevSize = connection.bufAccessor->length();
|
||||
|
||||
auto rollbackBuf = [&]() {
|
||||
auto buf = connection.bufAccessor->obtain();
|
||||
buf->trimEnd(buf->length() - prevSize);
|
||||
connection.bufAccessor->release(std::move(buf));
|
||||
connection.bufAccessor->trimEnd(
|
||||
connection.bufAccessor->length() - prevSize);
|
||||
};
|
||||
|
||||
// It's the scheduler's job to invoke encode header
|
||||
@ -272,16 +270,17 @@ DataPathResult continuousMemoryBuildScheduleEncrypt(
|
||||
}
|
||||
CHECK(!packet->header.isChained());
|
||||
auto headerLen = packet->header.length();
|
||||
buf = connection.bufAccessor->obtain();
|
||||
CHECK(
|
||||
packet->body.data() > buf->data() && packet->body.tail() <= buf->tail());
|
||||
packet->body.data() > connection.bufAccessor->data() &&
|
||||
packet->body.tail() <= connection.bufAccessor->tail());
|
||||
CHECK(
|
||||
packet->header.data() >= buf->data() &&
|
||||
packet->header.tail() < buf->tail());
|
||||
packet->header.data() >= connection.bufAccessor->data() &&
|
||||
packet->header.tail() < connection.bufAccessor->tail());
|
||||
// Trim off everything before the current packet, and the header length, so
|
||||
// buf's data starts from the body part of buf.
|
||||
buf->trimStart(prevSize + headerLen);
|
||||
connection.bufAccessor->trimStart(prevSize + headerLen);
|
||||
// buf and packetBuf is actually the same.
|
||||
auto buf = connection.bufAccessor->obtain();
|
||||
auto packetBuf =
|
||||
aead.inplaceEncrypt(std::move(buf), &packet->header, packetNum);
|
||||
CHECK(packetBuf->headroom() == headerLen + prevSize);
|
||||
@ -1627,9 +1626,9 @@ WriteQuicDataResult writeConnectionDataToSocket(
|
||||
if (connection.transportSettings.dataPathType ==
|
||||
DataPathType::ContinuousMemory) {
|
||||
CHECK(connection.bufAccessor->ownsBuffer());
|
||||
auto buf = connection.bufAccessor->obtain();
|
||||
CHECK(buf->length() == 0 && buf->headroom() == 0);
|
||||
connection.bufAccessor->release(std::move(buf));
|
||||
CHECK(
|
||||
connection.bufAccessor->length() == 0 &&
|
||||
connection.bufAccessor->headroom() == 0);
|
||||
}
|
||||
return {ioBufBatch.getPktSent(), 0, bytesWritten};
|
||||
}
|
||||
|
@ -350,7 +350,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterNeedsFlush) {
|
||||
gsoSupported_ = true;
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
@ -373,7 +373,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterAppendLimit) {
|
||||
gsoSupported_ = true;
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
@ -404,7 +404,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterAppendSmaller) {
|
||||
gsoSupported_ = true;
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
quic::QuicBatchingMode::BATCHING_MODE_GSO,
|
||||
@ -438,7 +438,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterWriteAll) {
|
||||
quic::test::MockAsyncUDPSocket sock(qEvb);
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
gsoSupported_ = true;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
@ -487,7 +487,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterWriteOne) {
|
||||
quic::test::MockAsyncUDPSocket sock(qEvb);
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
gsoSupported_ = true;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
@ -528,7 +528,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterLastOneTooBig) {
|
||||
quic::test::MockAsyncUDPSocket sock(qEvb);
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
gsoSupported_ = true;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
@ -576,7 +576,7 @@ TEST_F(QuicBatchWriterTest, InplaceWriterBufResidueCheck) {
|
||||
|
||||
uint32_t batchSize = 20;
|
||||
auto bufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
std::make_unique<BufAccessor>(conn_.udpSendPacketLen * batchSize);
|
||||
conn_.bufAccessor = bufAccessor.get();
|
||||
conn_.udpSendPacketLen = 1000;
|
||||
auto batchWriter = quic::BatchWriterFactory::makeBatchWriter(
|
||||
@ -618,8 +618,7 @@ class SinglePacketInplaceBatchWriterTest : public ::testing::Test {
|
||||
: conn_(FizzServerQuicHandshakeContext::Builder().build()) {}
|
||||
|
||||
void SetUp() override {
|
||||
bufAccessor_ =
|
||||
std::make_unique<quic::SimpleBufAccessor>(conn_.udpSendPacketLen);
|
||||
bufAccessor_ = std::make_unique<quic::BufAccessor>(conn_.udpSendPacketLen);
|
||||
conn_.bufAccessor = bufAccessor_.get();
|
||||
}
|
||||
|
||||
@ -641,7 +640,7 @@ class SinglePacketInplaceBatchWriterTest : public ::testing::Test {
|
||||
}
|
||||
|
||||
protected:
|
||||
std::unique_ptr<quic::SimpleBufAccessor> bufAccessor_;
|
||||
std::unique_ptr<quic::BufAccessor> bufAccessor_;
|
||||
QuicServerConnectionState conn_;
|
||||
};
|
||||
|
||||
|
@ -912,7 +912,7 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilder) {
|
||||
QuicClientConnectionState conn(
|
||||
FizzClientQuicHandshakeContext::Builder().build());
|
||||
conn.transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
auto buf = bufAccessor.obtain();
|
||||
EXPECT_EQ(buf->length(), 0);
|
||||
bufAccessor.release(std::move(buf));
|
||||
@ -955,7 +955,7 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilderFullPacket) {
|
||||
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
|
||||
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
|
||||
conn.transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
auto buf = bufAccessor.obtain();
|
||||
EXPECT_EQ(buf->length(), 0);
|
||||
bufAccessor.release(std::move(buf));
|
||||
@ -1818,7 +1818,7 @@ TEST_F(
|
||||
QuicClientConnectionState conn(
|
||||
FizzClientQuicHandshakeContext::Builder().build());
|
||||
conn.transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
auto buf = bufAccessor.obtain();
|
||||
EXPECT_EQ(buf->length(), 0);
|
||||
bufAccessor.release(std::move(buf));
|
||||
@ -1862,7 +1862,7 @@ TEST_F(
|
||||
QuicClientConnectionState conn(
|
||||
FizzClientQuicHandshakeContext::Builder().build());
|
||||
conn.transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
auto buf = bufAccessor.obtain();
|
||||
EXPECT_EQ(buf->length(), 0);
|
||||
bufAccessor.release(std::move(buf));
|
||||
|
@ -4287,12 +4287,11 @@ TEST_F(QuicTransportFunctionsTest, ProbeWriteNewFunctionalFramesAckFreq) {
|
||||
TEST_F(QuicTransportFunctionsTest, WriteWithInplaceBuilder) {
|
||||
auto conn = createConn();
|
||||
conn->transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
auto simpleBufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn->udpSendPacketLen * 16);
|
||||
auto outputBuf = simpleBufAccessor->obtain();
|
||||
auto bufAccessor = std::make_unique<BufAccessor>(conn->udpSendPacketLen * 16);
|
||||
auto outputBuf = bufAccessor->obtain();
|
||||
auto bufPtr = outputBuf.get();
|
||||
simpleBufAccessor->release(std::move(outputBuf));
|
||||
conn->bufAccessor = simpleBufAccessor.get();
|
||||
bufAccessor->release(std::move(outputBuf));
|
||||
conn->bufAccessor = bufAccessor.get();
|
||||
conn->transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_GSO;
|
||||
EventBase evb;
|
||||
std::shared_ptr<FollyQuicEventBase> qEvb =
|
||||
@ -4330,12 +4329,11 @@ TEST_F(QuicTransportFunctionsTest, WriteWithInplaceBuilder) {
|
||||
TEST_F(QuicTransportFunctionsTest, WriteWithInplaceBuilderRollbackBuf) {
|
||||
auto conn = createConn();
|
||||
conn->transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
auto simpleBufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn->udpSendPacketLen * 16);
|
||||
auto outputBuf = simpleBufAccessor->obtain();
|
||||
auto bufAccessor = std::make_unique<BufAccessor>(conn->udpSendPacketLen * 16);
|
||||
auto outputBuf = bufAccessor->obtain();
|
||||
auto bufPtr = outputBuf.get();
|
||||
simpleBufAccessor->release(std::move(outputBuf));
|
||||
conn->bufAccessor = simpleBufAccessor.get();
|
||||
bufAccessor->release(std::move(outputBuf));
|
||||
conn->bufAccessor = bufAccessor.get();
|
||||
conn->transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_GSO;
|
||||
EventBase evb;
|
||||
std::shared_ptr<FollyQuicEventBase> qEvb =
|
||||
@ -4359,12 +4357,11 @@ TEST_F(QuicTransportFunctionsTest, WriteWithInplaceBuilderRollbackBuf) {
|
||||
TEST_F(QuicTransportFunctionsTest, WriteWithInplaceBuilderGSOMultiplePackets) {
|
||||
auto conn = createConn();
|
||||
conn->transportSettings.dataPathType = DataPathType::ContinuousMemory;
|
||||
auto simpleBufAccessor =
|
||||
std::make_unique<SimpleBufAccessor>(conn->udpSendPacketLen * 16);
|
||||
auto outputBuf = simpleBufAccessor->obtain();
|
||||
auto bufAccessor = std::make_unique<BufAccessor>(conn->udpSendPacketLen * 16);
|
||||
auto outputBuf = bufAccessor->obtain();
|
||||
auto bufPtr = outputBuf.get();
|
||||
simpleBufAccessor->release(std::move(outputBuf));
|
||||
conn->bufAccessor = simpleBufAccessor.get();
|
||||
bufAccessor->release(std::move(outputBuf));
|
||||
conn->bufAccessor = bufAccessor.get();
|
||||
conn->transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_GSO;
|
||||
EventBase evb;
|
||||
std::shared_ptr<FollyQuicEventBase> qEvb =
|
||||
@ -4409,7 +4406,7 @@ TEST_F(QuicTransportFunctionsTest, WriteProbingWithInplaceBuilder) {
|
||||
quic::test::MockAsyncUDPSocket mockSock(qEvb);
|
||||
EXPECT_CALL(mockSock, getGSO()).WillRepeatedly(Return(true));
|
||||
|
||||
SimpleBufAccessor bufAccessor(
|
||||
BufAccessor bufAccessor(
|
||||
conn->udpSendPacketLen * conn->transportSettings.maxBatchSize);
|
||||
conn->bufAccessor = &bufAccessor;
|
||||
auto buf = bufAccessor.obtain();
|
||||
|
@ -209,7 +209,7 @@ class QuicClientTransport
|
||||
}
|
||||
|
||||
void createBufAccessor(size_t capacity) override {
|
||||
bufAccessor_ = std::make_unique<SimpleBufAccessor>(capacity);
|
||||
bufAccessor_ = std::make_unique<BufAccessor>(capacity);
|
||||
conn_->bufAccessor = bufAccessor_.get();
|
||||
}
|
||||
|
||||
|
@ -100,10 +100,9 @@ class QuicPacketBuilderTest : public TestWithParam<TestFlavor> {
|
||||
pktSizeLimit, std::move(header), largestAckedPacketNum);
|
||||
case TestFlavor::Inplace:
|
||||
CHECK(outputBufSize);
|
||||
simpleBufAccessor_ =
|
||||
std::make_unique<SimpleBufAccessor>(*outputBufSize);
|
||||
BufAccessor_ = std::make_unique<BufAccessor>(*outputBufSize);
|
||||
return std::make_unique<InplaceQuicPacketBuilder>(
|
||||
*simpleBufAccessor_,
|
||||
*BufAccessor_,
|
||||
pktSizeLimit,
|
||||
std::move(header),
|
||||
largestAckedPacketNum);
|
||||
@ -112,7 +111,7 @@ class QuicPacketBuilderTest : public TestWithParam<TestFlavor> {
|
||||
}
|
||||
|
||||
protected:
|
||||
std::unique_ptr<BufAccessor> simpleBufAccessor_;
|
||||
std::unique_ptr<BufAccessor> BufAccessor_;
|
||||
};
|
||||
|
||||
TEST_F(QuicPacketBuilderTest, SimpleVersionNegotiationPacket) {
|
||||
@ -328,7 +327,7 @@ TEST_P(QuicPacketBuilderTest, EnforcePacketSizeWithCipherOverhead) {
|
||||
} else {
|
||||
EXPECT_EQ(builtOut.body.isManagedOne(), false);
|
||||
InplaceSizeEnforcedPacketBuilder sizeEnforcedBuilder(
|
||||
*simpleBufAccessor_, std::move(builtOut), enforcedSize, cipherOverhead);
|
||||
*BufAccessor_, std::move(builtOut), enforcedSize, cipherOverhead);
|
||||
EXPECT_TRUE(sizeEnforcedBuilder.canBuildPacket());
|
||||
auto out = std::move(sizeEnforcedBuilder).buildPacket();
|
||||
EXPECT_EQ(
|
||||
@ -483,7 +482,7 @@ TEST_P(QuicPacketBuilderTest, ShortHeaderBytesCounting) {
|
||||
}
|
||||
|
||||
TEST_P(QuicPacketBuilderTest, InplaceBuilderReleaseBufferInDtor) {
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
EXPECT_TRUE(bufAccessor.ownsBuffer());
|
||||
auto builder = std::make_unique<InplaceQuicPacketBuilder>(
|
||||
bufAccessor,
|
||||
@ -496,7 +495,7 @@ TEST_P(QuicPacketBuilderTest, InplaceBuilderReleaseBufferInDtor) {
|
||||
}
|
||||
|
||||
TEST_P(QuicPacketBuilderTest, InplaceBuilderReleaseBufferInBuild) {
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
EXPECT_TRUE(bufAccessor.ownsBuffer());
|
||||
auto builder = std::make_unique<InplaceQuicPacketBuilder>(
|
||||
bufAccessor,
|
||||
@ -511,7 +510,7 @@ TEST_P(QuicPacketBuilderTest, InplaceBuilderReleaseBufferInBuild) {
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketBuilderTest, BuildTwoInplaces) {
|
||||
SimpleBufAccessor bufAccessor(2000);
|
||||
BufAccessor bufAccessor(2000);
|
||||
EXPECT_TRUE(bufAccessor.ownsBuffer());
|
||||
auto builder1 = std::make_unique<InplaceQuicPacketBuilder>(
|
||||
bufAccessor,
|
||||
|
@ -9,21 +9,25 @@
|
||||
|
||||
namespace quic {
|
||||
|
||||
SimpleBufAccessor::SimpleBufAccessor(Buf buf)
|
||||
BufAccessor::BufAccessor(Buf buf)
|
||||
: buf_(std::move(buf)), capacity_(buf_->capacity()) {
|
||||
CHECK(!buf_->isShared() && !buf_->isChained());
|
||||
}
|
||||
|
||||
SimpleBufAccessor::SimpleBufAccessor(size_t capacity)
|
||||
: SimpleBufAccessor(folly::IOBuf::createCombined(capacity)) {}
|
||||
BufAccessor::BufAccessor(size_t capacity)
|
||||
: BufAccessor(folly::IOBuf::createCombined(capacity)) {}
|
||||
|
||||
Buf SimpleBufAccessor::obtain() {
|
||||
Buf BufAccessor::obtain() {
|
||||
Buf ret;
|
||||
buf_.swap(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void SimpleBufAccessor::release(Buf buf) {
|
||||
Buf& BufAccessor::buf() {
|
||||
return buf_;
|
||||
}
|
||||
|
||||
void BufAccessor::release(Buf buf) {
|
||||
CHECK(!buf_) << "Can't override existing buf";
|
||||
CHECK(buf) << "Invalid Buf being released";
|
||||
CHECK_EQ(buf->capacity(), capacity_)
|
||||
@ -33,7 +37,52 @@ void SimpleBufAccessor::release(Buf buf) {
|
||||
buf_ = std::move(buf);
|
||||
}
|
||||
|
||||
bool SimpleBufAccessor::ownsBuffer() const {
|
||||
bool BufAccessor::ownsBuffer() const {
|
||||
return (buf_ != nullptr);
|
||||
}
|
||||
|
||||
const uint8_t* BufAccessor::tail() const {
|
||||
return buf_->tail();
|
||||
}
|
||||
|
||||
const uint8_t* BufAccessor::data() const {
|
||||
return buf_->data();
|
||||
}
|
||||
|
||||
std::size_t BufAccessor::tailroom() const {
|
||||
return buf_->tailroom();
|
||||
}
|
||||
|
||||
std::size_t BufAccessor::headroom() const {
|
||||
return buf_->headroom();
|
||||
}
|
||||
|
||||
std::size_t BufAccessor::length() const {
|
||||
return buf_->length();
|
||||
}
|
||||
|
||||
void BufAccessor::clear() {
|
||||
buf_->clear();
|
||||
}
|
||||
|
||||
bool BufAccessor::isChained() const {
|
||||
return buf_->isChained();
|
||||
}
|
||||
|
||||
void BufAccessor::trimEnd(std::size_t amount) {
|
||||
buf_->trimEnd(amount);
|
||||
}
|
||||
|
||||
void BufAccessor::trimStart(std::size_t amount) {
|
||||
buf_->trimStart(amount);
|
||||
}
|
||||
|
||||
uint8_t* BufAccessor::writableTail() {
|
||||
return buf_->writableTail();
|
||||
}
|
||||
|
||||
void BufAccessor::append(std::size_t amount) {
|
||||
buf_->append(amount);
|
||||
}
|
||||
|
||||
} // namespace quic
|
||||
|
@ -11,64 +11,63 @@
|
||||
|
||||
namespace quic {
|
||||
|
||||
/*
|
||||
* We use the BufAccessor in order to access a section of contiguous memory.
|
||||
* Right now, it works on an unchained IOBuf under the hood, but the plan is
|
||||
* to change it to have a uint8_t* under the hood. Once that's done, we can
|
||||
* remove the IOBuf-specific APIs, namely buf(), obtain(), and release().
|
||||
*/
|
||||
class BufAccessor {
|
||||
public:
|
||||
virtual ~BufAccessor() = default;
|
||||
explicit BufAccessor(Buf buf);
|
||||
|
||||
/**
|
||||
* BufAccessor gives caller the internal IOBuf.
|
||||
*/
|
||||
virtual Buf obtain() = 0;
|
||||
// The result capacity could be higher than the desired capacity.
|
||||
explicit BufAccessor(size_t capacity);
|
||||
|
||||
~BufAccessor() = default;
|
||||
|
||||
// API will be removed once we make the BufAccessor work on a uint8_t* instead
|
||||
// of an IOBuf.
|
||||
Buf& buf();
|
||||
|
||||
// API will be removed once we make the BufAccessor work on a uint8_t* instead
|
||||
// of an IOBuf.
|
||||
Buf obtain();
|
||||
|
||||
/**
|
||||
* Caller releases the IOBuf back to the accessor to own. The capacity has to
|
||||
* match the original IOBuf.
|
||||
* match the original IOBuf. API will be removed once we make the BufAccessor
|
||||
* work on a uint8_t* instead of an IOBuf.
|
||||
*/
|
||||
virtual void release(Buf buf) = 0;
|
||||
void release(Buf buf);
|
||||
|
||||
/**
|
||||
* Returns whether the BufAccessor currently owns an IOBuf.
|
||||
*/
|
||||
virtual bool ownsBuffer() const = 0;
|
||||
};
|
||||
bool ownsBuffer() const;
|
||||
|
||||
class SimpleBufAccessor : public BufAccessor {
|
||||
public:
|
||||
explicit SimpleBufAccessor(Buf buf);
|
||||
// Mirrored APIs from IOBuf.h
|
||||
const uint8_t* tail() const;
|
||||
const uint8_t* data() const;
|
||||
std::size_t tailroom() const;
|
||||
std::size_t headroom() const;
|
||||
|
||||
// The result capacity could be higher than the desired capacity.
|
||||
explicit SimpleBufAccessor(size_t capacity);
|
||||
std::size_t length() const;
|
||||
|
||||
~SimpleBufAccessor() override = default;
|
||||
void clear();
|
||||
|
||||
Buf obtain() override;
|
||||
bool isChained() const;
|
||||
|
||||
void release(Buf buf) override;
|
||||
void trimEnd(std::size_t amount);
|
||||
|
||||
bool ownsBuffer() const override;
|
||||
void trimStart(std::size_t amount);
|
||||
|
||||
uint8_t* writableTail();
|
||||
|
||||
void append(std::size_t amount);
|
||||
|
||||
private:
|
||||
Buf buf_;
|
||||
size_t capacity_;
|
||||
};
|
||||
|
||||
struct ScopedBufAccessor {
|
||||
public:
|
||||
explicit ScopedBufAccessor(BufAccessor* accessor) : bufAccessor_(accessor) {
|
||||
CHECK(bufAccessor_->ownsBuffer());
|
||||
buf_ = bufAccessor_->obtain();
|
||||
}
|
||||
|
||||
~ScopedBufAccessor() {
|
||||
bufAccessor_->release(std::move(buf_));
|
||||
}
|
||||
|
||||
std::unique_ptr<folly::IOBuf>& buf() {
|
||||
return buf_;
|
||||
}
|
||||
|
||||
private:
|
||||
BufAccessor* bufAccessor_;
|
||||
std::unique_ptr<folly::IOBuf> buf_;
|
||||
};
|
||||
} // namespace quic
|
||||
|
@ -10,8 +10,8 @@
|
||||
#include <folly/portability/GTest.h>
|
||||
|
||||
namespace quic {
|
||||
TEST(SimpleBufAccessor, BasicAccess) {
|
||||
SimpleBufAccessor accessor(1000);
|
||||
TEST(BufAccessor, BasicAccess) {
|
||||
BufAccessor accessor(1000);
|
||||
EXPECT_TRUE(accessor.ownsBuffer());
|
||||
auto buf = accessor.obtain();
|
||||
EXPECT_LE(1000, buf->capacity());
|
||||
@ -23,15 +23,15 @@ TEST(SimpleBufAccessor, BasicAccess) {
|
||||
EXPECT_DEATH(accessor.release(std::move(buf)), "");
|
||||
}
|
||||
|
||||
TEST(SimpleBufAccessor, CapacityMatch) {
|
||||
SimpleBufAccessor accessor(1000);
|
||||
TEST(BufAccessor, CapacityMatch) {
|
||||
BufAccessor accessor(1000);
|
||||
auto buf = accessor.obtain();
|
||||
buf = folly::IOBuf::create(2000);
|
||||
EXPECT_DEATH(accessor.release(std::move(buf)), "");
|
||||
}
|
||||
|
||||
TEST(SimpleBufAccessor, RefuseChainedBuf) {
|
||||
SimpleBufAccessor accessor(1000);
|
||||
TEST(BufAccessor, RefuseChainedBuf) {
|
||||
BufAccessor accessor(1000);
|
||||
auto buf = accessor.obtain();
|
||||
buf->prependChain(folly::IOBuf::create(0));
|
||||
EXPECT_DEATH(accessor.release(std::move(buf)), "");
|
||||
|
@ -150,8 +150,7 @@ BufQuicBatchResult PacketGroupWriter::writePacketsGroup(
|
||||
static auto& getThreadLocalConn(size_t maxPackets = 44) {
|
||||
static thread_local QuicConnectionStateBase fakeConn{QuicNodeType::Server};
|
||||
static thread_local bool initAccessor [[maybe_unused]] = [&]() {
|
||||
fakeConn.bufAccessor =
|
||||
new SimpleBufAccessor{kDefaultMaxUDPPayload * maxPackets};
|
||||
fakeConn.bufAccessor = new BufAccessor{kDefaultMaxUDPPayload * maxPackets};
|
||||
// Store this so we can use it to set the batch writer.
|
||||
fakeConn.transportSettings.maxBatchSize = maxPackets;
|
||||
return true;
|
||||
@ -232,7 +231,7 @@ BufAccessor* XskPacketGroupWriter::getBufAccessor() {
|
||||
[](void* /* buf */, void* /* userData */) {
|
||||
// Empty destructor because we don't own the buffer
|
||||
});
|
||||
bufAccessor_ = std::make_unique<SimpleBufAccessor>(std::move(ioBuf));
|
||||
bufAccessor_ = std::make_unique<BufAccessor>(std::move(ioBuf));
|
||||
return bufAccessor_.get();
|
||||
}
|
||||
|
||||
|
@ -234,7 +234,7 @@ class XskPacketGroupWriter : public PacketGroupWriter {
|
||||
folly::SocketAddress vipAddress_;
|
||||
facebook::xdpsocket::XskBuffer currentXskBuffer_;
|
||||
BufQuicBatchResult result_;
|
||||
std::unique_ptr<SimpleBufAccessor> bufAccessor_;
|
||||
std::unique_ptr<BufAccessor> bufAccessor_;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -69,7 +69,7 @@ TEST_F(DSRPacketizerSingleWriteTest, SingleWrite) {
|
||||
size_t length = 100;
|
||||
bool eof = false;
|
||||
auto dcid = test::getTestConnectionId();
|
||||
SimpleBufAccessor accessor{16 * kDefaultMaxUDPPayload};
|
||||
BufAccessor accessor{16 * kDefaultMaxUDPPayload};
|
||||
UdpSocketPacketGroupWriter packetGroupWriter(
|
||||
*socket, peerAddress, std::move(batchWriter));
|
||||
auto ret = packetGroupWriter.writeSingleQuicPacket(
|
||||
@ -109,7 +109,7 @@ TEST_F(DSRPacketizerSingleWriteTest, NotEnoughData) {
|
||||
size_t offset = 0;
|
||||
size_t length = 100;
|
||||
bool eof = false;
|
||||
SimpleBufAccessor accessor{16 * kDefaultMaxUDPPayload};
|
||||
BufAccessor accessor{16 * kDefaultMaxUDPPayload};
|
||||
auto ret = packetGroupWriter.writeSingleQuicPacket(
|
||||
accessor,
|
||||
test::getTestConnectionId(),
|
||||
|
@ -72,7 +72,7 @@ QuicServerWorker::QuicServerWorker(
|
||||
// TODO: maxBatchSize is only a good start value when each transport does
|
||||
// its own socket writing. If we experiment with multiple transports GSO
|
||||
// together, we will need a better value.
|
||||
bufAccessor_ = std::make_unique<SimpleBufAccessor>(
|
||||
bufAccessor_ = std::make_unique<BufAccessor>(
|
||||
kDefaultMaxUDPPayload * transportSettings_.maxBatchSize);
|
||||
VLOG(10) << "GSO write buf accessor created for ContinuousMemory data path";
|
||||
}
|
||||
|
Reference in New Issue
Block a user