1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-10 21:22:20 +03:00

Move Quic Stream data writing from WriteCodec to PacketBuilder

Summary:
Different Builders now want to have its own way of writing or
appending write buffer. So let builders handle them.

This also add new APIs in BufWriter to copy data from IOBuf/BufQueue directly
into a destination IOBuf without cloning inbetween.

Reviewed By: mjoras

Differential Revision: D20821789

fbshipit-source-id: c0a24eb12378f64cf26c27d4232f610ed80fba84
This commit is contained in:
Yang Chi
2020-04-07 08:43:40 -07:00
committed by Facebook GitHub Bot
parent 826031a8f2
commit db58ba1ca4
9 changed files with 199 additions and 29 deletions

View File

@@ -179,6 +179,24 @@ void RegularQuicPacketBuilder::insert(std::unique_ptr<folly::IOBuf> buf) {
bodyAppender_.insert(std::move(buf)); bodyAppender_.insert(std::move(buf));
} }
void RegularQuicPacketBuilder::insert(
std::unique_ptr<folly::IOBuf> buf,
size_t limit) {
std::unique_ptr<folly::IOBuf> streamData;
folly::io::Cursor cursor(buf.get());
cursor.clone(streamData, limit);
// reminaingBytes_ update is taken care of inside this insert call:
insert(std::move(streamData));
}
void RegularQuicPacketBuilder::insert(const BufQueue& buf, size_t limit) {
std::unique_ptr<folly::IOBuf> streamData;
folly::io::Cursor cursor(buf.front());
cursor.clone(streamData, limit);
// reminaingBytes_ update is taken care of inside this insert call:
insert(std::move(streamData));
}
void RegularQuicPacketBuilder::appendFrame(QuicWriteFrame frame) { void RegularQuicPacketBuilder::appendFrame(QuicWriteFrame frame) {
packet_.frames.push_back(std::move(frame)); packet_.frames.push_back(std::move(frame));
} }
@@ -422,7 +440,19 @@ void InplaceQuicPacketBuilder::appendBytes(
void InplaceQuicPacketBuilder::insert(std::unique_ptr<folly::IOBuf> buf) { void InplaceQuicPacketBuilder::insert(std::unique_ptr<folly::IOBuf> buf) {
remainingBytes_ -= buf->computeChainDataLength(); remainingBytes_ -= buf->computeChainDataLength();
bufWriter_.insert(std::move(buf)); bufWriter_.insert(buf.get());
}
void InplaceQuicPacketBuilder::insert(
std::unique_ptr<folly::IOBuf> buf,
size_t limit) {
remainingBytes_ -= limit;
bufWriter_.insert(buf.get(), limit);
}
void InplaceQuicPacketBuilder::insert(const BufQueue& buf, size_t limit) {
remainingBytes_ -= limit;
bufWriter_.insert(buf.front(), limit);
} }
void InplaceQuicPacketBuilder::appendFrame(QuicWriteFrame frame) { void InplaceQuicPacketBuilder::appendFrame(QuicWriteFrame frame) {

View File

@@ -69,6 +69,8 @@ class PacketBuilderInterface {
virtual void virtual void
appendBytes(BufWriter& writer, PacketNum value, uint8_t byteNumber) = 0; appendBytes(BufWriter& writer, PacketNum value, uint8_t byteNumber) = 0;
virtual void insert(std::unique_ptr<folly::IOBuf> buf) = 0; virtual void insert(std::unique_ptr<folly::IOBuf> buf) = 0;
virtual void insert(std::unique_ptr<folly::IOBuf> buf, size_t limit) = 0;
virtual void insert(const BufQueue& buf, size_t limit) = 0;
virtual void push(const uint8_t* data, size_t len) = 0; virtual void push(const uint8_t* data, size_t len) = 0;
// Append a frame to the packet. // Append a frame to the packet.
@@ -126,6 +128,8 @@ class InplaceQuicPacketBuilder final : public PacketBuilderInterface {
void appendBytes(BufWriter& writer, PacketNum value, uint8_t byteNumber) void appendBytes(BufWriter& writer, PacketNum value, uint8_t byteNumber)
override; override;
void insert(std::unique_ptr<folly::IOBuf> buf) override; void insert(std::unique_ptr<folly::IOBuf> buf) override;
void insert(std::unique_ptr<folly::IOBuf> buf, size_t limit) override;
void insert(const BufQueue& buf, size_t limit) override;
void push(const uint8_t* data, size_t len) override; void push(const uint8_t* data, size_t len) override;
void appendFrame(QuicWriteFrame frame) override; void appendFrame(QuicWriteFrame frame) override;
@@ -186,6 +190,9 @@ class RegularQuicPacketBuilder final : public PacketBuilderInterface {
CHECK(false) << "Invalid BufWriter"; CHECK(false) << "Invalid BufWriter";
} }
void insert(std::unique_ptr<folly::IOBuf> buf) override; void insert(std::unique_ptr<folly::IOBuf> buf) override;
void insert(std::unique_ptr<folly::IOBuf> buf, size_t limit) override;
void insert(const BufQueue& buf, size_t limit) override;
void push(const uint8_t* data, size_t len) override; void push(const uint8_t* data, size_t len) override;
void appendFrame(QuicWriteFrame frame) override; void appendFrame(QuicWriteFrame frame) override;
@@ -319,6 +326,14 @@ class PacketBuilderWrapper : public PacketBuilderInterface {
builder.insert(std::move(buf)); builder.insert(std::move(buf));
} }
void insert(std::unique_ptr<folly::IOBuf> buf, size_t limit) override {
builder.insert(std::move(buf), limit);
}
void insert(const BufQueue& buf, size_t limit) override {
builder.insert(buf, limit);
}
void appendFrame(QuicWriteFrame frame) override { void appendFrame(QuicWriteFrame frame) override {
builder.appendFrame(std::move(frame)); builder.appendFrame(std::move(frame));
} }

View File

@@ -140,10 +140,7 @@ void writeStreamFrameData(
const BufQueue& writeBuffer, const BufQueue& writeBuffer,
uint64_t dataLen) { uint64_t dataLen) {
if (dataLen > 0) { if (dataLen > 0) {
Buf streamData; builder.insert(writeBuffer, dataLen);
folly::io::Cursor cursor(writeBuffer.front());
cursor.clone(streamData, dataLen);
builder.insert(std::move(streamData));
} }
} }
@@ -152,10 +149,7 @@ void writeStreamFrameData(
Buf writeBuffer, Buf writeBuffer,
uint64_t dataLen) { uint64_t dataLen) {
if (dataLen > 0) { if (dataLen > 0) {
Buf streamData; builder.insert(std::move(writeBuffer), dataLen);
folly::io::Cursor cursor(writeBuffer.get());
cursor.clone(streamData, dataLen);
builder.insert(std::move(streamData));
} }
} }

View File

@@ -90,6 +90,8 @@ void writeStreamFrameData(
* This writes dataLen worth of bytes from the parameter writeBuffer into the * This writes dataLen worth of bytes from the parameter writeBuffer into the
* parameter builder. This should only be called after a complete stream header * parameter builder. This should only be called after a complete stream header
* has been written by writeStreamFrameHeader. * has been written by writeStreamFrameHeader.
*
* TODO: we can let the Buf be a ref and stop the cloning in PacketRebuilder.
*/ */
void writeStreamFrameData( void writeStreamFrameData(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,

View File

@@ -52,8 +52,14 @@ class MockQuicPacketBuilder : public PacketBuilderInterface {
void insert(std::unique_ptr<folly::IOBuf> buf) override { void insert(std::unique_ptr<folly::IOBuf> buf) override {
_insert(buf); _insert(buf);
} }
void insert(std::unique_ptr<folly::IOBuf> buf, size_t limit) override {
_insert(buf, limit);
}
MOCK_METHOD1(appendFrame, void(QuicWriteFrame)); MOCK_METHOD1(appendFrame, void(QuicWriteFrame));
MOCK_METHOD1(_insert, void(std::unique_ptr<folly::IOBuf>&)); MOCK_METHOD1(_insert, void(std::unique_ptr<folly::IOBuf>&));
MOCK_METHOD2(_insert, void(std::unique_ptr<folly::IOBuf>&, size_t));
MOCK_METHOD2(insert, void(const BufQueue&, size_t));
MOCK_METHOD2(push, void(const uint8_t*, size_t)); MOCK_METHOD2(push, void(const uint8_t*, size_t));
MOCK_METHOD1(write, void(const QuicInteger&)); MOCK_METHOD1(write, void(const QuicInteger&));

View File

@@ -84,6 +84,25 @@ void setupCommonExpects(MockQuicPacketBuilder& pktBuilder) {
pktBuilder.appender_.insert(std::move(buf)); pktBuilder.appender_.insert(std::move(buf));
}))); })));
EXPECT_CALL(pktBuilder, _insert(_, _))
.WillRepeatedly(WithArgs<0, 1>(Invoke([&](Buf& buf, size_t limit) {
pktBuilder.remaining_ -= limit;
std::unique_ptr<folly::IOBuf> cloneBuf;
folly::io::Cursor cursor(buf.get());
cursor.clone(cloneBuf, limit);
pktBuilder.appender_.insert(std::move(cloneBuf));
})));
EXPECT_CALL(pktBuilder, insert(_, _))
.WillRepeatedly(
WithArgs<0, 1>(Invoke([&](const BufQueue& buf, size_t limit) {
pktBuilder.remaining_ -= limit;
std::unique_ptr<folly::IOBuf> cloneBuf;
folly::io::Cursor cursor(buf.front());
cursor.clone(cloneBuf, limit);
pktBuilder.appender_.insert(std::move(cloneBuf));
})));
EXPECT_CALL(pktBuilder, push(_, _)) EXPECT_CALL(pktBuilder, push(_, _))
.WillRepeatedly( .WillRepeatedly(
WithArgs<0, 1>(Invoke([&](const uint8_t* data, size_t len) { WithArgs<0, 1>(Invoke([&](const uint8_t* data, size_t len) {

View File

@@ -162,8 +162,13 @@ void BufWriter::push(const uint8_t* data, size_t len) {
written_ += len; written_ += len;
} }
void BufWriter::insert(std::unique_ptr<folly::IOBuf> data) { void BufWriter::insert(const folly::IOBuf* data) {
copy(std::move(data)); auto totalLength = data->computeChainDataLength();
insert(data, totalLength);
}
void BufWriter::insert(const folly::IOBuf* data, size_t limit) {
copy(data, limit);
} }
void BufWriter::append(size_t len) { void BufWriter::append(size_t len) {
@@ -172,18 +177,25 @@ void BufWriter::append(size_t len) {
appendCount_ += len; appendCount_ += len;
} }
void BufWriter::copy(std::unique_ptr<folly::IOBuf> data) { void BufWriter::copy(const folly::IOBuf* data, size_t limit) {
sizeCheck(data->computeChainDataLength()); if (!limit) {
size_t totalInserted = 0; return;
folly::IOBuf* curBuf = data.get();
while (curBuf != data->prev()) {
push(curBuf->data(), curBuf->length());
totalInserted += curBuf->length();
curBuf = curBuf->next();
} }
push(curBuf->data(), curBuf->length()); sizeCheck(limit);
totalInserted += curBuf->length(); size_t totalInserted = 0;
CHECK_EQ(data->computeChainDataLength(), totalInserted); const folly::IOBuf* curBuf = data;
auto remaining = limit;
do {
auto lenToCopy = std::min(curBuf->length(), remaining);
push(curBuf->data(), lenToCopy);
totalInserted += lenToCopy;
remaining -= lenToCopy;
if (lenToCopy < curBuf->length()) {
break;
}
curBuf = curBuf->next();
} while (remaining && curBuf != data);
CHECK_GE(limit, totalInserted);
} }
void BufWriter::backFill(const uint8_t* data, size_t len, size_t destOffset) { void BufWriter::backFill(const uint8_t* data, size_t len, size_t destOffset) {

View File

@@ -109,7 +109,8 @@ class BufWriter {
// TODO: OK, "insert" is a lie. Inside, we copy. But I'd like the BufWriter // TODO: OK, "insert" is a lie. Inside, we copy. But I'd like the BufWriter
// to have the same interface as BufAppender during the transition period. // to have the same interface as BufAppender during the transition period.
void insert(std::unique_ptr<folly::IOBuf> data); void insert(const folly::IOBuf* data);
void insert(const folly::IOBuf* data, size_t limit);
void append(size_t len); void append(size_t len);
@@ -124,7 +125,7 @@ class BufWriter {
<< " written=" << written_ << " limit=" << most_; << " written=" << written_ << " limit=" << most_;
} }
void copy(std::unique_ptr<folly::IOBuf> data); void copy(const folly::IOBuf* data, size_t limit);
private: private:
folly::IOBuf& iobuf_; folly::IOBuf& iobuf_;

View File

@@ -355,9 +355,9 @@ TEST(BufWriterTest, InsertSingle) {
auto inputBuffer = auto inputBuffer =
folly::IOBuf::copyBuffer("Steady on dreaming, I sleepwalk"); folly::IOBuf::copyBuffer("Steady on dreaming, I sleepwalk");
auto len = inputBuffer->computeChainDataLength(); auto len = inputBuffer->computeChainDataLength();
writer.insert(std::move(inputBuffer)); writer.insert(inputBuffer.get());
folly::io::Cursor reader(testBuffer.get()); folly::io::Cursor reader(testBuffer.get());
EXPECT_EQ("Steady on dreaming, I sleepwalk", reader.readFixedString(len)); EXPECT_EQ(inputBuffer->coalesce().str(), reader.readFixedString(len));
} }
TEST(BufWriterTest, InsertChain) { TEST(BufWriterTest, InsertChain) {
@@ -368,14 +368,14 @@ TEST(BufWriterTest, InsertChain) {
inputBuffer->prependChain( inputBuffer->prependChain(
folly::IOBuf::copyBuffer(" Can't believe that we are through.")); folly::IOBuf::copyBuffer(" Can't believe that we are through."));
inputBuffer->prependChain( inputBuffer->prependChain(
folly::IOBuf::copyBuffer(" While the memor of you linger like a song.")); folly::IOBuf::copyBuffer(" While the memory of you linger like a song."));
auto len = inputBuffer->computeChainDataLength(); auto len = inputBuffer->computeChainDataLength();
writer.insert(std::move(inputBuffer)); writer.insert(inputBuffer.get());
folly::io::Cursor reader(testBuffer.get()); folly::io::Cursor reader(testBuffer.get());
EXPECT_EQ( EXPECT_EQ(
"Cause I lost you and now what am i to do?" "Cause I lost you and now what am i to do?"
" Can't believe that we are through." " Can't believe that we are through."
" While the memor of you linger like a song.", " While the memory of you linger like a song.",
reader.readFixedString(len)); reader.readFixedString(len));
} }
@@ -396,3 +396,94 @@ TEST(BufWriterTest, BackFill) {
reader.readFixedString( reader.readFixedString(
testInput1.size() + testInput2.size() + testInput3.size())); testInput1.size() + testInput2.size() + testInput3.size()));
} }
TEST(BufWriterTest, BufQueueCopy) {
BufQueue queue;
queue.append(folly::IOBuf::copyBuffer("I feel like I'm drowning"));
auto outputBuffer = folly::IOBuf::create(200);
BufWriter bufWriter(*outputBuffer, 200);
bufWriter.insert(queue.front(), queue.chainLength());
folly::io::Cursor reader(outputBuffer.get());
EXPECT_EQ(
"I feel like I'm drowning", reader.readFixedString(queue.chainLength()));
}
TEST(BufWriterTest, BufQueueCopyPartial) {
BufQueue queue;
queue.append(folly::IOBuf::copyBuffer("I feel like I'm drowning"));
auto outputBuffer = folly::IOBuf::create(200);
BufWriter bufWriter(*outputBuffer, 200);
bufWriter.insert(queue.front(), 6);
folly::io::Cursor reader(outputBuffer.get());
EXPECT_EQ(
"I feel", reader.readFixedString(outputBuffer->computeChainDataLength()));
}
TEST(BufWriterTest, BufQueueChainCopy) {
BufQueue queue;
queue.append(folly::IOBuf::copyBuffer("I'm a hotpot. "));
queue.append(folly::IOBuf::copyBuffer("You mere are hotpot soup base."));
auto outputBuffer = folly::IOBuf::create(1000);
BufWriter bufWriter(*outputBuffer, 1000);
bufWriter.insert(queue.front(), queue.chainLength());
folly::io::Cursor reader(outputBuffer.get());
EXPECT_EQ(
"I'm a hotpot. You mere are hotpot soup base.",
reader.readFixedString(queue.chainLength()));
}
TEST(BufWriterTest, BufQueueChainCopyPartial) {
BufQueue queue;
std::string testStr1("I remember when I first noticed. ");
std::string testStr2("That you liked me back.");
queue.append(folly::IOBuf::copyBuffer(testStr1));
queue.append(folly::IOBuf::copyBuffer(testStr2));
auto outputBuffer = folly::IOBuf::create(1000);
BufWriter bufWriter(*outputBuffer, 1000);
bufWriter.insert(queue.front(), testStr1.size() + 10);
folly::io::Cursor reader(outputBuffer.get());
EXPECT_EQ(
folly::to<std::string>(testStr1, "That you l"),
reader.readFixedString(testStr1.size() + 10));
}
TEST(BufWriterTest, IOBufChainCopyTooLargeLimit) {
auto outputBuffer = folly::IOBuf::create(1000);
BufWriter bufWriter(*outputBuffer, 1000);
auto inputBuffer =
folly::IOBuf::copyBuffer("Tired of seeing adventures on a cafe wall. ");
inputBuffer->prependChain(
folly::IOBuf::copyBuffer("Think I'll take a turn from the known road. "));
inputBuffer->prependChain(
folly::IOBuf::copyBuffer("Think I'll write a tale of my own."));
// Use a limit thats larger than input size
bufWriter.insert(
inputBuffer.get(), inputBuffer->computeChainDataLength() * 3);
folly::io::Cursor reader(outputBuffer.get());
EXPECT_EQ(
"Tired of seeing adventures on a cafe wall. "
"Think I'll take a turn from the known road. "
"Think I'll write a tale of my own.",
reader.readFixedString(outputBuffer->computeChainDataLength()));
}
TEST(BufWriterTest, BufQueueChainCopyTooLargeLimit) {
BufQueue queue;
std::string testStr1("I see trees of green. ");
std::string testStr2("Red rose too. ");
std::string testStr3("I see them bloom.");
queue.append(folly::IOBuf::copyBuffer(testStr1));
queue.append(folly::IOBuf::copyBuffer(testStr2));
queue.append(folly::IOBuf::copyBuffer(testStr3));
auto outputBuffer = folly::IOBuf::create(1000);
BufWriter bufWriter(*outputBuffer, 1000);
bufWriter.insert(
queue.front(), (testStr1.size() + testStr2.size() + testStr3.size()) * 5);
folly::io::Cursor reader(outputBuffer.get());
EXPECT_EQ(
"I see trees of green. "
"Red rose too. "
"I see them bloom.",
reader.readFixedString(
testStr1.size() + testStr2.size() + testStr3.size()));
}