1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

iobufqueue diediedie

Summary:
Don't use IOBufQueue for most operations in mvfst and use BufQueue instead. Since BufQueue did not support a splitAtMost, added it in instead.

The only place that we still use IOBufQueue is in crypto because fizz still requires it

Reviewed By: mjoras

Differential Revision: D18846960

fbshipit-source-id: 4320b7f8614f8d2c75f6de0e6b786d33650e9656
This commit is contained in:
Subodh Iyengar
2019-12-06 12:04:23 -08:00
committed by Facebook Github Bot
parent cadedee2fd
commit e524c0c069
27 changed files with 109 additions and 69 deletions

View File

@@ -96,7 +96,8 @@ void handleNewStreamDataWritten(
// Idealy we should also check this data doesn't exist in either retx buffer // Idealy we should also check this data doesn't exist in either retx buffer
// or loss buffer, but that's an expensive search. // or loss buffer, but that's an expensive search.
stream.currentWriteOffset += frameLen; stream.currentWriteOffset += frameLen;
auto bufWritten = stream.writeBuffer.split(folly::to<size_t>(frameLen)); auto bufWritten = stream.writeBuffer.splitAtMost(folly::to<size_t>(frameLen));
DCHECK_EQ(bufWritten->computeChainDataLength(), frameLen);
stream.currentWriteOffset += frameFin ? 1 : 0; stream.currentWriteOffset += frameFin ? 1 : 0;
CHECK(stream.retransmissionBuffer CHECK(stream.retransmissionBuffer
.emplace( .emplace(
@@ -126,7 +127,7 @@ void handleRetransmissionWritten(
stream.lossBuffer.erase(lossBufferIter); stream.lossBuffer.erase(lossBufferIter);
} else { } else {
lossBufferIter->offset += frameLen; lossBufferIter->offset += frameLen;
bufWritten = lossBufferIter->data.split(frameLen); bufWritten = lossBufferIter->data.splitAtMost(frameLen);
} }
CHECK(stream.retransmissionBuffer CHECK(stream.retransmissionBuffer
.emplace( .emplace(

View File

@@ -16,6 +16,7 @@ quic_add_test(TARGET QuicTransportTest
QuicTransportTest.cpp QuicTransportTest.cpp
DEPENDS DEPENDS
Folly::folly Folly::folly
mvfst_bufutil
mvfst_transport mvfst_transport
mvfst_server mvfst_server
mvfst_state_stream_functions mvfst_state_stream_functions

View File

@@ -11,11 +11,11 @@
#include <folly/Random.h> #include <folly/Random.h>
#include <folly/io/Cursor.h> #include <folly/io/Cursor.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/async/test/MockAsyncUDPSocket.h> #include <folly/io/async/test/MockAsyncUDPSocket.h>
#include <quic/api/QuicTransportBase.h> #include <quic/api/QuicTransportBase.h>
#include <quic/api/QuicTransportFunctions.h> #include <quic/api/QuicTransportFunctions.h>
#include <quic/api/test/Mocks.h> #include <quic/api/test/Mocks.h>
#include <quic/common/BufUtil.h>
#include <quic/common/Timers.h> #include <quic/common/Timers.h>
#include <quic/common/test/TestUtils.h> #include <quic/common/test/TestUtils.h>
#include <quic/handshake/test/Mocks.h> #include <quic/handshake/test/Mocks.h>
@@ -307,7 +307,7 @@ void verifyCorrectness(
EXPECT_EQ(finExpected, finSet); EXPECT_EQ(finExpected, finSet);
// Verify retransmissionBuffer: // Verify retransmissionBuffer:
EXPECT_FALSE(stream->retransmissionBuffer.empty()); EXPECT_FALSE(stream->retransmissionBuffer.empty());
IOBufQueue retxBufCombined; BufQueue retxBufCombined;
std::vector<StreamBuffer> rtxCopy; std::vector<StreamBuffer> rtxCopy;
for (auto& itr : stream->retransmissionBuffer) { for (auto& itr : stream->retransmissionBuffer) {
rtxCopy.push_back(StreamBuffer( rtxCopy.push_back(StreamBuffer(
@@ -380,9 +380,6 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto lossStream = transport_->createBidirectionalStream().value(); auto lossStream = transport_->createBidirectionalStream().value();
conn.streamManager->addLoss(lossStream); conn.streamManager->addLoss(lossStream);
conn.streamManager->getStream(lossStream)
->lossBuffer.emplace_back(
IOBuf::copyBuffer("Mountains may depart"), 0, false);
transport_->writeChain( transport_->writeChain(
stream, stream,
IOBuf::copyBuffer("An elephant sitting still"), IOBuf::copyBuffer("An elephant sitting still"),

View File

@@ -26,6 +26,7 @@ target_compile_options(
add_dependencies( add_dependencies(
mvfst_client mvfst_client
mvfst_bufutil
mvfst_flowcontrol mvfst_flowcontrol
mvfst_happyeyeballs mvfst_happyeyeballs
mvfst_loss mvfst_loss

View File

@@ -96,7 +96,7 @@ QuicClientTransport::~QuicClientTransport() {
void QuicClientTransport::processUDPData( void QuicClientTransport::processUDPData(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) { NetworkDataSingle&& networkData) {
folly::IOBufQueue udpData{folly::IOBufQueue::cacheChainLength()}; BufQueue udpData;
udpData.append(std::move(networkData.data)); udpData.append(std::move(networkData.data));
if (!conn_->version) { if (!conn_->version) {
@@ -129,7 +129,7 @@ void QuicClientTransport::processUDPData(
void QuicClientTransport::processPacketData( void QuicClientTransport::processPacketData(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
TimePoint receiveTimePoint, TimePoint receiveTimePoint,
folly::IOBufQueue& packetQueue) { BufQueue& packetQueue) {
auto packetSize = packetQueue.chainLength(); auto packetSize = packetQueue.chainLength();
if (packetSize == 0) { if (packetSize == 0) {
return; return;

View File

@@ -15,6 +15,7 @@
#include <quic/api/QuicTransportBase.h> #include <quic/api/QuicTransportBase.h>
#include <quic/client/handshake/QuicPskCache.h> #include <quic/client/handshake/QuicPskCache.h>
#include <quic/client/state/ClientStateMachine.h> #include <quic/client/state/ClientStateMachine.h>
#include <quic/common/BufUtil.h>
namespace quic { namespace quic {
@@ -159,7 +160,7 @@ class QuicClientTransport
void processPacketData( void processPacketData(
const folly::SocketAddress& peer, const folly::SocketAddress& peer,
TimePoint receiveTimePoint, TimePoint receiveTimePoint,
folly::IOBufQueue& packetQueue); BufQueue& packetQueue);
void startCryptoHandshake(); void startCryptoHandshake();

View File

@@ -13,6 +13,7 @@ quic_add_test(TARGET QuicClientTransportTest
DEPENDS DEPENDS
Folly::folly Folly::folly
${LIBGMOCK_LIBRARIES} ${LIBGMOCK_LIBRARIES}
mvfst_bufutil
mvfst_client mvfst_client
mvfst_codec_types mvfst_codec_types
mvfst_handshake mvfst_handshake

View File

@@ -338,7 +338,7 @@ class QuicClientTransportIntegrationTest : public TestWithParam<TestingParams> {
class StreamData { class StreamData {
public: public:
folly::IOBufQueue data{folly::IOBufQueue::cacheChainLength()}; BufQueue data;
folly::Promise<StreamPair> promise; folly::Promise<StreamPair> promise;
StreamId id; StreamId id;
@@ -3115,7 +3115,7 @@ TEST_F(QuicClientTransportAfterStartTest, ReadStreamCoalescedMany) {
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
EXPECT_CALL(readCb, readAvailable(streamId)).Times(0); EXPECT_CALL(readCb, readAvailable(streamId)).Times(0);
FizzCryptoFactory cryptoFactory; FizzCryptoFactory cryptoFactory;
IOBufQueue packets{IOBufQueue::cacheChainLength()}; BufQueue packets;
for (int i = 0; i < kMaxNumCoalescedPackets; i++) { for (int i = 0; i < kMaxNumCoalescedPackets; i++) {
auto garbage = IOBuf::copyBuffer("garbage"); auto garbage = IOBuf::copyBuffer("garbage");
auto initialCipher = cryptoFactory.getServerInitialCipher( auto initialCipher = cryptoFactory.getServerInitialCipher(

View File

@@ -189,6 +189,7 @@ target_compile_options(
add_dependencies( add_dependencies(
mvfst_codec mvfst_codec
mvfst_bufutil
mvfst_constants mvfst_constants
mvfst_codec_decode mvfst_codec_decode
mvfst_codec_types mvfst_codec_types

View File

@@ -337,7 +337,7 @@ ReadStreamFrame decodeStreamFrame(
} }
// If dataLength > data's actual length then the cursor will throw. // If dataLength > data's actual length then the cursor will throw.
queue.trimStart(cursor - queue.front()); queue.trimStart(cursor - queue.front());
data = queue.split(dataLength->first); data = queue.splitAtMost(dataLength->first);
} else { } else {
// Missing Data Length field doesn't mean no data. It means the rest of the // Missing Data Length field doesn't mean no data. It means the rest of the
// frame are all data. // frame are all data.

View File

@@ -37,7 +37,7 @@ constexpr auto kLongHeaderHeaderSize = sizeof(uint8_t) /* Type bytes */ +
constexpr auto kCipherOverheadHeuristic = 16; constexpr auto kCipherOverheadHeuristic = 16;
// TODO: i'm sure this isn't the optimal value: // TODO: i'm sure this isn't the optimal value:
// IOBufQueue growth byte size for in PacketBuilder: // Appender growth byte size for in PacketBuilder:
constexpr size_t kAppenderGrowthSize = 100; constexpr size_t kAppenderGrowthSize = 100;
class PacketBuilderInterface { class PacketBuilderInterface {

View File

@@ -23,7 +23,7 @@ namespace quic {
QuicReadCodec::QuicReadCodec(QuicNodeType nodeType) : nodeType_(nodeType) {} QuicReadCodec::QuicReadCodec(QuicNodeType nodeType) : nodeType_(nodeType) {}
folly::Optional<VersionNegotiationPacket> folly::Optional<VersionNegotiationPacket>
QuicReadCodec::tryParsingVersionNegotiation(folly::IOBufQueue& queue) { QuicReadCodec::tryParsingVersionNegotiation(BufQueue& queue) {
folly::io::Cursor cursor(queue.front()); folly::io::Cursor cursor(queue.front());
if (!cursor.canAdvance(sizeof(uint8_t))) { if (!cursor.canAdvance(sizeof(uint8_t))) {
return folly::none; return folly::none;
@@ -47,7 +47,7 @@ QuicReadCodec::tryParsingVersionNegotiation(folly::IOBufQueue& queue) {
} }
CodecResult QuicReadCodec::parseLongHeaderPacket( CodecResult QuicReadCodec::parseLongHeaderPacket(
folly::IOBufQueue& queue, BufQueue& queue,
const AckStates& ackStates) { const AckStates& ackStates) {
folly::io::Cursor cursor(queue.front()); folly::io::Cursor cursor(queue.front());
auto initialByte = cursor.readBE<uint8_t>(); auto initialByte = cursor.readBE<uint8_t>();
@@ -57,7 +57,7 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
// We've failed to parse the long header, so we have no idea where this // We've failed to parse the long header, so we have no idea where this
// packet ends. Clear the queue since no other data in this packet is // packet ends. Clear the queue since no other data in this packet is
// parse-able. // parse-able.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
if (longHeaderInvariant->invariant.version == if (longHeaderInvariant->invariant.version ==
@@ -67,7 +67,7 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
// function. // function.
// Since VN is not allowed to be coalesced with another packet // Since VN is not allowed to be coalesced with another packet
// type, we clear out the buffer to avoid anyone else parsing it. // type, we clear out the buffer to avoid anyone else parsing it.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
auto type = parseLongHeaderType(initialByte); auto type = parseLongHeaderType(initialByte);
@@ -79,7 +79,7 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
// We've failed to parse the long header, so we have no idea where this // We've failed to parse the long header, so we have no idea where this
// packet ends. Clear the queue since no other data in this packet is // packet ends. Clear the queue since no other data in this packet is
// parse-able. // parse-able.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
// As soon as we have parsed out the long header we can split off any // As soon as we have parsed out the long header we can split off any
@@ -96,10 +96,10 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
packetNumberOffset + parsedLongHeader->packetLength.packetLength; packetNumberOffset + parsedLongHeader->packetLength.packetLength;
if (queue.chainLength() < currentPacketLen) { if (queue.chainLength() < currentPacketLen) {
// Packet appears truncated, there's no parse-able data left. // Packet appears truncated, there's no parse-able data left.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
auto currentPacketData = queue.split(currentPacketLen); auto currentPacketData = queue.splitAtMost(currentPacketLen);
cursor.reset(currentPacketData.get()); cursor.reset(currentPacketData.get());
cursor.skip(packetNumberOffset); cursor.skip(packetNumberOffset);
// Sample starts after the max packet number size. This ensures that we // Sample starts after the max packet number size. This ensures that we
@@ -108,7 +108,7 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
VLOG(4) << "Dropping packet, not enough for packet number " VLOG(4) << "Dropping packet, not enough for packet number "
<< connIdToHex(); << connIdToHex();
// Packet appears truncated, there's no parse-able data left. // Packet appears truncated, there's no parse-able data left.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
cursor.skip(kMaxPacketNumEncodingSize); cursor.skip(kMaxPacketNumEncodingSize);
@@ -116,7 +116,7 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
if (!cursor.canAdvance(sample.size())) { if (!cursor.canAdvance(sample.size())) {
VLOG(4) << "Dropping packet, sample too small " << connIdToHex(); VLOG(4) << "Dropping packet, sample too small " << connIdToHex();
// Packet appears truncated, there's no parse-able data left. // Packet appears truncated, there's no parse-able data left.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
cursor.pull(sample.data(), sample.size()); cursor.pull(sample.data(), sample.size());
@@ -197,10 +197,10 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
initialByteRange.data()[0], packetNumberByteRange, expectedNextPacketNum); initialByteRange.data()[0], packetNumberByteRange, expectedNextPacketNum);
longHeader.setPacketNumber(packetNum.first); longHeader.setPacketNumber(packetNum.first);
folly::IOBufQueue decryptQueue{folly::IOBufQueue::cacheChainLength()}; BufQueue decryptQueue;
decryptQueue.append(std::move(currentPacketData)); decryptQueue.append(std::move(currentPacketData));
size_t aadLen = packetNumberOffset + packetNum.second; size_t aadLen = packetNumberOffset + packetNum.second;
auto headerData = decryptQueue.split(aadLen); auto headerData = decryptQueue.splitAtMost(aadLen);
// parsing verifies that packetLength >= packet number length. // parsing verifies that packetLength >= packet number length.
auto encryptedData = decryptQueue.splitAtMost( auto encryptedData = decryptQueue.splitAtMost(
parsedLongHeader->packetLength.packetLength - packetNum.second); parsedLongHeader->packetLength.packetLength - packetNum.second);
@@ -233,7 +233,7 @@ CodecResult QuicReadCodec::parseLongHeaderPacket(
} }
CodecResult QuicReadCodec::parsePacket( CodecResult QuicReadCodec::parsePacket(
folly::IOBufQueue& queue, BufQueue& queue,
const AckStates& ackStates, const AckStates& ackStates,
size_t dstConnIdSize) { size_t dstConnIdSize) {
if (queue.empty()) { if (queue.empty()) {
@@ -272,7 +272,7 @@ CodecResult QuicReadCodec::parsePacket(
VLOG(10) << "Dropping packet, too small for sample " << connIdToHex(); VLOG(10) << "Dropping packet, too small for sample " << connIdToHex();
// There's not enough space for the short header packet, clear the queue // There's not enough space for the short header packet, clear the queue
// to indicate there's no more parse-able data. // to indicate there's no more parse-able data.
queue.clear(); queue.move();
return CodecResult(Nothing()); return CodecResult(Nothing());
} }
// Take it out of the queue so we can do some writing. // Take it out of the queue so we can do some writing.

View File

@@ -13,6 +13,7 @@
#include <quic/codec/PacketNumber.h> #include <quic/codec/PacketNumber.h>
#include <quic/codec/PacketNumberCipher.h> #include <quic/codec/PacketNumberCipher.h>
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/common/BufUtil.h>
#include <quic/handshake/Aead.h> #include <quic/handshake/Aead.h>
#include <quic/state/AckStates.h> #include <quic/state/AckStates.h>
@@ -94,7 +95,7 @@ class QuicReadCodec {
* before the version is negotiated to detect VN. * before the version is negotiated to detect VN.
*/ */
virtual CodecResult parsePacket( virtual CodecResult parsePacket(
folly::IOBufQueue& queue, BufQueue& queue,
const AckStates& ackStates, const AckStates& ackStates,
size_t dstConnIdSize = kDefaultConnectionIdSize); size_t dstConnIdSize = kDefaultConnectionIdSize);
@@ -105,7 +106,7 @@ class QuicReadCodec {
* a VN packet or is invalid. * a VN packet or is invalid.
*/ */
folly::Optional<VersionNegotiationPacket> tryParsingVersionNegotiation( folly::Optional<VersionNegotiationPacket> tryParsingVersionNegotiation(
folly::IOBufQueue& queue); BufQueue& queue);
const Aead* getOneRttReadCipher() const; const Aead* getOneRttReadCipher() const;
const Aead* getZeroRttReadCipher() const; const Aead* getZeroRttReadCipher() const;
@@ -151,7 +152,7 @@ class QuicReadCodec {
private: private:
CodecResult parseLongHeaderPacket( CodecResult parseLongHeaderPacket(
folly::IOBufQueue& queue, BufQueue& queue,
const AckStates& ackStates); const AckStates& ackStates);
std::string connIdToHex(); std::string connIdToHex();

View File

@@ -32,6 +32,7 @@ quic_add_test(TARGET QuicWriteCodecTest
QuicWriteCodecTest.cpp QuicWriteCodecTest.cpp
DEPENDS DEPENDS
Folly::folly Folly::folly
mvfst_bufutil
mvfst_codec mvfst_codec
mvfst_codec_decode mvfst_codec_decode
mvfst_codec_types mvfst_codec_types

View File

@@ -124,6 +124,37 @@ TEST_F(QuicReadCodecTest, RetryPacketTest) {
EXPECT_EQ(headerOut.getToken(), expected); EXPECT_EQ(headerOut.getToken(), expected);
} }
TEST_F(QuicReadCodecTest, LongHeaderPacketLenMismatch) {
LongHeader headerIn(
LongHeader::Types::Initial,
getTestConnectionId(70),
getTestConnectionId(90),
321,
QuicVersion::MVFST,
std::string("fluffydog"),
getTestConnectionId(110));
RegularQuicPacketBuilder builder(
kDefaultUDPSendPacketLen, std::move(headerIn), 0 /* largestAcked */);
builder.setCipherOverhead(0);
writeCryptoFrame(0, folly::IOBuf::copyBuffer("CHLO"), builder);
auto packet = packetToBuf(std::move(builder).buildPacket());
auto packetQueue = bufToQueue(std::move(packet));
auto tmp = packetQueue.move();
tmp->coalesce();
tmp->trimEnd(1);
packetQueue.append(std::move(tmp));
AckStates ackStates;
auto codec = makeUnencryptedCodec();
codec->setInitialReadCipher(createNoOpAead());
codec->setInitialHeaderCipher(test::createNoOpHeaderCipher());
auto result = codec->parsePacket(packetQueue, ackStates);
auto nothing = result.nothing();
EXPECT_NE(nothing, nullptr);
}
TEST_F(QuicReadCodecTest, EmptyVersionNegotiationPacketTest) { TEST_F(QuicReadCodecTest, EmptyVersionNegotiationPacketTest) {
auto srcConnId = getTestConnectionId(0), destConnId = getTestConnectionId(1); auto srcConnId = getTestConnectionId(0), destConnId = getTestConnectionId(1);
std::vector<QuicVersion> versions; std::vector<QuicVersion> versions;

View File

@@ -9,13 +9,13 @@
#include <quic/codec/QuicWriteCodec.h> #include <quic/codec/QuicWriteCodec.h>
#include <folly/Random.h> #include <folly/Random.h>
#include <folly/io/Cursor.h> #include <folly/io/Cursor.h>
#include <folly/io/IOBufQueue.h>
#include <folly/portability/GMock.h> #include <folly/portability/GMock.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
#include <quic/QuicException.h> #include <quic/QuicException.h>
#include <quic/codec/Decode.h> #include <quic/codec/Decode.h>
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/codec/test/Mocks.h> #include <quic/codec/test/Mocks.h>
#include <quic/common/BufUtil.h>
#include <quic/common/test/TestUtils.h> #include <quic/common/test/TestUtils.h>
using namespace quic; using namespace quic;

View File

@@ -4,13 +4,10 @@
namespace quic { namespace quic {
Buf BufQueue::split(size_t len) { Buf BufQueue::splitAtMost(size_t len) {
Buf result; Buf result;
folly::IOBuf* current = chain_.get(); folly::IOBuf* current = chain_.get();
if (current == nullptr && len > 0) { if (current == nullptr) {
throw std::underflow_error(
"Attempt to remove more bytes than are present in BufQueue");
} else if (current == nullptr) {
DCHECK_EQ(chainLength_, 0); DCHECK_EQ(chainLength_, 0);
return folly::IOBuf::create(0); return folly::IOBuf::create(0);
} }
@@ -43,12 +40,16 @@ Buf BufQueue::split(size_t len) {
} }
current = current->next(); current = current->next();
if (current == chain_.get()) { if (current == chain_.get()) {
throw std::underflow_error( break;
"Attempt to remove more bytes than are present in BufQueue");
} }
} }
chainLength_ -= len; if (remaining > 0) {
DCHECK(chainLength_ == 0 || chain_); // We did not find all the data we needed, so we are going to consume the
// entire chain instead.
result = std::move(chain_);
}
chainLength_ -= (len - remaining);
DCHECK_EQ(chainLength_, chain_ ? chain_->computeChainDataLength() : 0);
if (UNLIKELY(result == nullptr)) { if (UNLIKELY(result == nullptr)) {
return folly::IOBuf::create(0); return folly::IOBuf::create(0);
} }

View File

@@ -50,7 +50,7 @@ class BufQueue {
return chain_.get(); return chain_.get();
} }
Buf split(size_t n); Buf splitAtMost(size_t n);
size_t trimStartAtMost(size_t amount); size_t trimStartAtMost(size_t amount);

View File

@@ -78,55 +78,59 @@ TEST(BufQueue, Split) {
checkConsistency(queue); checkConsistency(queue);
EXPECT_EQ(12, queue.front()->computeChainDataLength()); EXPECT_EQ(12, queue.front()->computeChainDataLength());
unique_ptr<IOBuf> prefix(queue.split(1)); unique_ptr<IOBuf> prefix(queue.splitAtMost(1));
checkConsistency(queue); checkConsistency(queue);
EXPECT_EQ(1, prefix->computeChainDataLength()); EXPECT_EQ(1, prefix->computeChainDataLength());
EXPECT_EQ(11, queue.front()->computeChainDataLength()); EXPECT_EQ(11, queue.front()->computeChainDataLength());
prefix = queue.split(2); prefix = queue.splitAtMost(2);
checkConsistency(queue); checkConsistency(queue);
EXPECT_EQ(2, prefix->computeChainDataLength()); EXPECT_EQ(2, prefix->computeChainDataLength());
EXPECT_EQ(9, queue.front()->computeChainDataLength()); EXPECT_EQ(9, queue.front()->computeChainDataLength());
prefix = queue.split(3); prefix = queue.splitAtMost(3);
checkConsistency(queue); checkConsistency(queue);
EXPECT_EQ(3, prefix->computeChainDataLength()); EXPECT_EQ(3, prefix->computeChainDataLength());
EXPECT_EQ(6, queue.front()->computeChainDataLength()); EXPECT_EQ(6, queue.front()->computeChainDataLength());
prefix = queue.split(1); prefix = queue.splitAtMost(1);
checkConsistency(queue); checkConsistency(queue);
EXPECT_EQ(1, prefix->computeChainDataLength()); EXPECT_EQ(1, prefix->computeChainDataLength());
EXPECT_EQ(5, queue.front()->computeChainDataLength()); EXPECT_EQ(5, queue.front()->computeChainDataLength());
prefix = queue.split(5); prefix = queue.splitAtMost(5);
checkConsistency(queue); checkConsistency(queue);
EXPECT_EQ(5, prefix->computeChainDataLength()); EXPECT_EQ(5, prefix->computeChainDataLength());
EXPECT_EQ((IOBuf*)nullptr, queue.front()); EXPECT_EQ((IOBuf*)nullptr, queue.front());
queue.append(IOBuf::copyBuffer(SCL("Hello,"))); queue.append(IOBuf::copyBuffer(SCL("Hello,")));
prefix = queue.split(3); prefix = queue.splitAtMost(3);
EXPECT_EQ(3, prefix->computeChainDataLength()); EXPECT_EQ(3, prefix->computeChainDataLength());
EXPECT_EQ(3, queue.chainLength()); EXPECT_EQ(3, queue.chainLength());
checkConsistency(queue); checkConsistency(queue);
queue.append(IOBuf::copyBuffer(SCL(" World"))); queue.append(IOBuf::copyBuffer(SCL(" World")));
checkConsistency(queue); checkConsistency(queue);
EXPECT_THROW({ prefix = queue.split(13); }, std::underflow_error);
prefix = queue.splitAtMost(13);
EXPECT_EQ(9, prefix->computeChainDataLength());
EXPECT_EQ(0, queue.chainLength());
checkConsistency(queue); checkConsistency(queue);
} }
TEST(BufQueue, SplitZero) { TEST(BufQueue, SplitZero) {
BufQueue queue; BufQueue queue;
queue.append(IOBuf::copyBuffer(SCL("Hello world"))); queue.append(IOBuf::copyBuffer(SCL("Hello world")));
auto buf = queue.split(0); auto buf = queue.splitAtMost(0);
EXPECT_EQ(buf->computeChainDataLength(), 0); EXPECT_EQ(buf->computeChainDataLength(), 0);
} }
TEST(BufQueue, SplitEmpty) { TEST(BufQueue, SplitEmpty) {
BufQueue queue; BufQueue queue;
auto buf = queue.split(0); auto buf = queue.splitAtMost(0);
EXPECT_EQ(buf->computeChainDataLength(), 0); EXPECT_EQ(buf->computeChainDataLength(), 0);
} }
TEST(BufQueue, SplitEmptyInvalid) { TEST(BufQueue, SplitEmptt) {
BufQueue queue; BufQueue queue;
EXPECT_THROW(queue.split(1), std::underflow_error); auto res = queue.splitAtMost(1);
EXPECT_EQ(res->computeChainDataLength(), 0);
} }
TEST(BufQueue, TrimStartAtMost) { TEST(BufQueue, TrimStartAtMost) {

View File

@@ -556,8 +556,8 @@ CongestionController::AckEvent makeAck(
return ack; return ack;
} }
folly::IOBufQueue bufToQueue(Buf buf) { BufQueue bufToQueue(Buf buf) {
folly::IOBufQueue queue{folly::IOBufQueue::cacheChainLength()}; BufQueue queue;
buf->coalesce(); buf->coalesce();
queue.append(std::move(buf)); queue.append(std::move(buf));
return queue; return queue;

View File

@@ -11,6 +11,7 @@
#include <quic/client/handshake/QuicPskCache.h> #include <quic/client/handshake/QuicPskCache.h>
#include <quic/codec/QuicPacketBuilder.h> #include <quic/codec/QuicPacketBuilder.h>
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/common/BufUtil.h>
#include <quic/handshake/test/Mocks.h> #include <quic/handshake/test/Mocks.h>
#include <quic/logging/FileQLogger.h> #include <quic/logging/FileQLogger.h>
#include <quic/server/state/ServerStateMachine.h> #include <quic/server/state/ServerStateMachine.h>
@@ -224,7 +225,7 @@ CongestionController::AckEvent makeAck(
TimePoint ackedTime, TimePoint ackedTime,
TimePoint sendTime); TimePoint sendTime);
folly::IOBufQueue bufToQueue(Buf buf); BufQueue bufToQueue(Buf buf);
StatelessResetToken generateStatelessResetToken(); StatelessResetToken generateStatelessResetToken();

View File

@@ -19,6 +19,7 @@
#include <quic/api/QuicSocket.h> #include <quic/api/QuicSocket.h>
#include <quic/client/QuicClientTransport.h> #include <quic/client/QuicClientTransport.h>
#include <quic/client/handshake/FizzClientQuicHandshakeContext.h> #include <quic/client/handshake/FizzClientQuicHandshakeContext.h>
#include <quic/common/BufUtil.h>
#include <quic/common/test/TestUtils.h> #include <quic/common/test/TestUtils.h>
namespace quic { namespace quic {
@@ -155,7 +156,7 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
~EchoClient() override = default; ~EchoClient() override = default;
private: private:
void sendMessage(quic::StreamId id, folly::IOBufQueue& data) { void sendMessage(quic::StreamId id, BufQueue& data) {
auto message = data.move(); auto message = data.move();
auto res = quicClient_->writeChain(id, message->clone(), true, false); auto res = quicClient_->writeChain(id, message->clone(), true, false);
if (res.hasError()) { if (res.hasError()) {
@@ -178,7 +179,7 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
uint16_t port_; uint16_t port_;
bool prEnabled_; bool prEnabled_;
std::shared_ptr<quic::QuicClientTransport> quicClient_; std::shared_ptr<quic::QuicClientTransport> quicClient_;
std::map<quic::StreamId, folly::IOBufQueue> pendingOutput_; std::map<quic::StreamId, BufQueue> pendingOutput_;
std::map<quic::StreamId, uint64_t> recvOffsets_; std::map<quic::StreamId, uint64_t> recvOffsets_;
}; };
} // namespace samples } // namespace samples

View File

@@ -10,8 +10,8 @@
#include <quic/api/QuicSocket.h> #include <quic/api/QuicSocket.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
#include <quic/common/BufUtil.h>
namespace quic { namespace quic {
namespace samples { namespace samples {
@@ -19,7 +19,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
public quic::QuicSocket::ReadCallback, public quic::QuicSocket::ReadCallback,
public quic::QuicSocket::WriteCallback { public quic::QuicSocket::WriteCallback {
public: public:
using StreamData = std::pair<folly::IOBufQueue, bool>; using StreamData = std::pair<BufQueue, bool>;
explicit EchoHandler(folly::EventBase* evbIn, bool prEnabled = false) explicit EchoHandler(folly::EventBase* evbIn, bool prEnabled = false)
: evb(evbIn), prEnabled_(prEnabled) {} : evb(evbIn), prEnabled_(prEnabled) {}
@@ -62,10 +62,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
return; return;
} }
if (input_.find(id) == input_.end()) { if (input_.find(id) == input_.end()) {
input_.emplace( input_.emplace(id, std::make_pair(BufQueue(), false));
id,
std::make_pair(
folly::IOBufQueue(folly::IOBufQueue::cacheChainLength()), false));
} }
quic::Buf data = std::move(res.value().first); quic::Buf data = std::move(res.value().first);
bool eof = res.value().second; bool eof = res.value().second;
@@ -145,7 +142,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
} }
} }
originalData.split(toSplit); originalData.splitAtMost(toSplit);
auto res = sock->writeChain(id, originalData.move(), true, false, nullptr); auto res = sock->writeChain(id, originalData.move(), true, false, nullptr);
if (res.hasError()) { if (res.hasError()) {

View File

@@ -8,6 +8,7 @@
#include <quic/server/state/ServerStateMachine.h> #include <quic/server/state/ServerStateMachine.h>
#include <quic/common/BufUtil.h>
#include <quic/congestion_control/CongestionControllerFactory.h> #include <quic/congestion_control/CongestionControllerFactory.h>
#include <quic/flowcontrol/QuicFlowController.h> #include <quic/flowcontrol/QuicFlowController.h>
#include <quic/handshake/FizzCryptoFactory.h> #include <quic/handshake/FizzCryptoFactory.h>
@@ -604,7 +605,7 @@ void onServerReadDataFromOpen(
initialDestinationConnectionId, version); initialDestinationConnectionId, version);
conn.peerAddress = conn.originalPeerAddress; conn.peerAddress = conn.originalPeerAddress;
} }
folly::IOBufQueue udpData{folly::IOBufQueue::cacheChainLength()}; BufQueue udpData;
udpData.append(std::move(readData.networkData.data)); udpData.append(std::move(readData.networkData.data));
for (uint16_t processedPackets = 0; for (uint16_t processedPackets = 0;
!udpData.empty() && processedPackets < kMaxNumCoalescedPackets; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets;
@@ -1043,7 +1044,7 @@ void onServerReadDataFromClosed(
QuicServerConnectionState& conn, QuicServerConnectionState& conn,
ServerEvents::ReadData& readData) { ServerEvents::ReadData& readData) {
CHECK_EQ(conn.state, ServerState::Closed); CHECK_EQ(conn.state, ServerState::Closed);
folly::IOBufQueue udpData{folly::IOBufQueue::cacheChainLength()}; BufQueue udpData;
udpData.append(std::move(readData.networkData.data)); udpData.append(std::move(readData.networkData.data));
auto packetSize = udpData.empty() ? 0 : udpData.chainLength(); auto packetSize = udpData.empty() ? 0 : udpData.chainLength();
if (!conn.readCodec) { if (!conn.readCodec) {

View File

@@ -10,7 +10,6 @@
#include <quic/api/test/MockQuicSocket.h> #include <quic/api/test/MockQuicSocket.h>
#include <quic/samples/echo/EchoHandler.h> #include <quic/samples/echo/EchoHandler.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>

View File

@@ -248,7 +248,8 @@ std::pair<Buf, bool> readDataInOrderFromReadBuffer(
if (sinkData) { if (sinkData) {
curr->data.trimStart(toRead); curr->data.trimStart(toRead);
} else { } else {
splice = curr->data.split(toRead); splice = curr->data.splitAtMost(toRead);
DCHECK_EQ(splice->computeChainDataLength(), toRead);
} }
curr->offset += toRead; curr->offset += toRead;
if (curr->data.chainLength() == 0) { if (curr->data.chainLength() == 0) {

View File

@@ -10,7 +10,6 @@
#include <folly/Optional.h> #include <folly/Optional.h>
#include <folly/io/IOBuf.h> #include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/async/AsyncUDPSocket.h> #include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/HHWheelTimer.h> #include <folly/io/async/HHWheelTimer.h>
#include <quic/QuicConstants.h> #include <quic/QuicConstants.h>