1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-10 21:22:20 +03:00

Replace IOBufQueue in retransmission buffer.

Summary:
`IOBufQueue` has some facilities for fast appending to the tail. This is not useful for us in the retransmission buffer usecase, and probably not at all. Flushing the tail cache from the `IOBufQueue` is expensive when we have to shuffle around the retransmission buffer queue on removal.

This diff replaces `IOBufQueue` with a bespoke version that only has some of the functionality.

This also changes the dependent peek APIs to use `IOBuf`s directly.

Reviewed By: siyengar, yangchi

Differential Revision: D18126437

fbshipit-source-id: a2fec0f45a72459855700c605bfd0d863a9067b7
This commit is contained in:
Matt Joras
2019-11-20 12:02:35 -08:00
committed by Facebook Github Bot
parent fe92d2a5ce
commit cf783ae678
11 changed files with 317 additions and 9 deletions

View File

@@ -26,6 +26,7 @@ target_compile_options(
add_dependencies(
mvfst_transport
mvfst_bufutil
mvfst_cc_algo
mvfst_codec
mvfst_codec_pktbuilder
@@ -49,6 +50,7 @@ add_dependencies(
target_link_libraries(
mvfst_transport PUBLIC
Folly::folly
mvfst_bufutil
mvfst_cc_algo
mvfst_codec
mvfst_codec_pktbuilder

View File

@@ -135,6 +135,18 @@ folly::Optional<uint64_t> writeStreamFrameHeader(
return folly::make_optional(dataLen);
}
void writeStreamFrameData(
PacketBuilderInterface& builder,
const BufQueue& writeBuffer,
uint64_t dataLen) {
if (dataLen > 0) {
Buf streamData;
folly::io::Cursor cursor(writeBuffer.front());
cursor.clone(streamData, dataLen);
builder.insert(std::move(streamData));
}
}
void writeStreamFrameData(
PacketBuilderInterface& builder,
const folly::IOBufQueue& writeBuffer,

View File

@@ -80,6 +80,11 @@ folly::Optional<uint64_t> writeStreamFrameHeader(
* parameter builder. This should only be called after a complete stream header
* has been written by writeStreamFrameHeader.
*/
void writeStreamFrameData(
PacketBuilderInterface& builder,
const BufQueue& writeBuffer,
uint64_t dataLen);
void writeStreamFrameData(
PacketBuilderInterface& builder,
const folly::IOBufQueue& writeBuffer,

View File

@@ -11,11 +11,11 @@
#include <folly/Conv.h>
#include <folly/Optional.h>
#include <folly/io/Cursor.h>
#include <folly/io/IOBuf.h>
#include <quic/QuicConstants.h>
#include <quic/QuicException.h>
#include <quic/codec/QuicConnectionId.h>
#include <quic/codec/QuicInteger.h>
#include <quic/common/BufUtil.h>
#include <quic/common/IntervalSet.h>
#include <quic/common/Variant.h>
@@ -26,8 +26,6 @@
namespace quic {
using Buf = std::unique_ptr<folly::IOBuf>;
using StreamId = uint64_t;
using PacketNum = uint64_t;

77
quic/common/BufUtil.cpp Normal file
View File

@@ -0,0 +1,77 @@
// Copyright 2004-present Facebook. All Rights Reserved.
#include "quic/common/BufUtil.h"
namespace quic {
Buf BufQueue::split(size_t n) {
Buf result;
while (n != 0) {
if (chain_ == nullptr) {
throw std::underflow_error(
"Attempt to remove more bytes than are present in BufQueue");
} else if (chain_->length() <= n) {
n -= chain_->length();
chainLength_ -= chain_->length();
Buf remainder = chain_->pop();
appendToChain(result, std::move(chain_));
chain_ = std::move(remainder);
} else {
Buf clone = chain_->cloneOne();
clone->trimEnd(clone->length() - n);
appendToChain(result, std::move(clone));
chain_->trimStart(n);
chainLength_ -= n;
break;
}
}
if (UNLIKELY(result == nullptr)) {
return folly::IOBuf::create(0);
}
return result;
}
size_t BufQueue::trimStartAtMost(size_t amount) {
auto original = amount;
while (amount > 0) {
if (!chain_) {
break;
}
if (chain_->length() > amount) {
chain_->trimStart(amount);
chainLength_ -= amount;
amount = 0;
break;
}
amount -= chain_->length();
chainLength_ -= chain_->length();
chain_ = chain_->pop();
}
return original - amount;
}
// TODO replace users with trimStartAtMost
void BufQueue::trimStart(size_t amount) {
auto trimmed = trimStartAtMost(amount);
if (trimmed != amount) {
throw std::underflow_error(
"Attempt to trim more bytes than are present in BufQueue");
}
}
void BufQueue::append(Buf&& buf) {
if (!buf || buf->empty()) {
return;
}
chainLength_ += buf->computeChainDataLength();
appendToChain(chain_, std::move(buf));
}
void BufQueue::appendToChain(Buf& dst, Buf&& src) {
if (dst == nullptr) {
dst = std::move(src);
} else {
dst->prependChain(std::move(src));
}
}
} // namespace quic

67
quic/common/BufUtil.h Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include <folly/io/IOBuf.h>
namespace quic {
using Buf = std::unique_ptr<folly::IOBuf>;
class BufQueue {
public:
BufQueue() = default;
BufQueue(Buf chain) : chain_(std::move(chain)) {
if (chain_) {
chainLength_ = chain_->computeChainDataLength();
}
}
BufQueue(BufQueue&& other) noexcept
: chain_(std::move(other.chain_)), chainLength_(other.chainLength_) {
other.chainLength_ = 0;
}
BufQueue& operator=(BufQueue&& other) {
if (&other != this) {
chain_ = std::move(other.chain_);
chainLength_ = other.chainLength_;
other.chainLength_ = 0;
}
return *this;
}
BufQueue(const BufQueue&) = delete;
BufQueue& operator=(const BufQueue&) = delete;
size_t chainLength() const {
return chainLength_;
}
bool empty() const {
return chainLength_ == 0;
}
Buf move() {
chainLength_ = 0;
return std::move(chain_);
}
const folly::IOBuf* front() const {
return chain_.get();
}
Buf split(size_t n);
size_t trimStartAtMost(size_t amount);
void trimStart(size_t amount);
void append(Buf&& buf);
private:
void appendToChain(Buf& dst, Buf&& src);
Buf chain_;
size_t chainLength_{0};
};
} // namespace quic

View File

@@ -26,6 +26,28 @@ target_link_libraries(
Folly::folly
)
add_library(
mvfst_bufutil STATIC
BufUtil.cpp
)
target_include_directories(
mvfst_bufutil PUBLIC
$<BUILD_INTERFACE:${QUIC_FBCODE_ROOT}>
$<INSTALL_INTERFACE:include/>
)
target_compile_options(
mvfst_bufutil
PRIVATE
${_QUIC_COMMON_COMPILE_OPTIONS}
)
target_link_libraries(
mvfst_bufutil PUBLIC
Folly::folly
)
file(
GLOB_RECURSE QUIC_API_HEADERS_TOINSTALL
RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
@@ -43,4 +65,11 @@ install(
DESTINATION lib
)
install(
TARGETS mvfst_bufutil
EXPORT mvfst-exports
DESTINATION lib
)
add_subdirectory(test)

View File

@@ -0,0 +1,113 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#include <gtest/gtest.h>
#include <quic/common/BufUtil.h>
using namespace std;
using namespace folly;
using namespace quic;
#define SCL(x) (x), sizeof(x) - 1
namespace {
void checkConsistency(const BufQueue& queue) {
size_t len = queue.front() ? queue.front()->computeChainDataLength() : 0;
EXPECT_EQ(len, queue.chainLength());
}
} // namespace
TEST(BufQueue, Append) {
BufQueue queue;
queue.append(IOBuf::copyBuffer(SCL("Hello")));
BufQueue queue2;
queue2.append(IOBuf::copyBuffer(SCL(", ")));
queue2.append(IOBuf::copyBuffer(SCL("World")));
checkConsistency(queue);
checkConsistency(queue2);
queue.append(queue2.move());
checkConsistency(queue);
checkConsistency(queue2);
const IOBuf* chain = queue.front();
EXPECT_NE((IOBuf*)nullptr, chain);
EXPECT_EQ(12, chain->computeChainDataLength());
EXPECT_EQ(nullptr, queue2.front());
}
TEST(BufQueue, Append2) {
BufQueue queue;
queue.append(IOBuf::copyBuffer(SCL("Hello")));
BufQueue queue2;
queue2.append(IOBuf::copyBuffer(SCL(", ")));
queue2.append(IOBuf::copyBuffer(SCL("World")));
checkConsistency(queue);
checkConsistency(queue2);
}
TEST(BufQueue, AppendStringPiece) {
std::string s("Hello, World");
BufQueue queue;
BufQueue queue2;
queue.append(IOBuf::copyBuffer(s.data(), s.length()));
queue2.append(IOBuf::copyBuffer(s));
checkConsistency(queue);
checkConsistency(queue2);
const IOBuf* chain = queue.front();
const IOBuf* chain2 = queue2.front();
EXPECT_EQ(s.length(), chain->computeChainDataLength());
EXPECT_EQ(s.length(), chain2->computeChainDataLength());
EXPECT_EQ(0, memcmp(chain->data(), chain2->data(), s.length()));
}
TEST(BufQueue, Split) {
BufQueue queue;
queue.append(IOBuf::copyBuffer(SCL("Hello")));
queue.append(IOBuf::copyBuffer(SCL(",")));
queue.append(IOBuf::copyBuffer(SCL(" ")));
queue.append(IOBuf::copyBuffer(SCL("")));
queue.append(IOBuf::copyBuffer(SCL("World")));
checkConsistency(queue);
EXPECT_EQ(12, queue.front()->computeChainDataLength());
unique_ptr<IOBuf> prefix(queue.split(1));
checkConsistency(queue);
EXPECT_EQ(1, prefix->computeChainDataLength());
EXPECT_EQ(11, queue.front()->computeChainDataLength());
prefix = queue.split(2);
checkConsistency(queue);
EXPECT_EQ(2, prefix->computeChainDataLength());
EXPECT_EQ(9, queue.front()->computeChainDataLength());
prefix = queue.split(3);
checkConsistency(queue);
EXPECT_EQ(3, prefix->computeChainDataLength());
EXPECT_EQ(6, queue.front()->computeChainDataLength());
prefix = queue.split(1);
checkConsistency(queue);
EXPECT_EQ(1, prefix->computeChainDataLength());
EXPECT_EQ(5, queue.front()->computeChainDataLength());
prefix = queue.split(5);
checkConsistency(queue);
EXPECT_EQ(5, prefix->computeChainDataLength());
EXPECT_EQ((IOBuf*)nullptr, queue.front());
queue.append(IOBuf::copyBuffer(SCL("Hello,")));
queue.append(IOBuf::copyBuffer(SCL(" World")));
checkConsistency(queue);
EXPECT_THROW({ prefix = queue.split(13); }, std::underflow_error);
checkConsistency(queue);
}
TEST(BufQueue, SplitZero) {
BufQueue queue;
queue.append(IOBuf::copyBuffer(SCL("Hello world")));
auto buf = queue.split(0);
EXPECT_EQ(buf->computeChainDataLength(), 0);
}

View File

@@ -61,9 +61,11 @@ quic_add_test(TARGET QuicCommonUtilTest SOURCES
TimeUtilTest.cpp
IntervalSetTest.cpp
VariantTest.cpp
BufUtilTest.cpp
DEPENDS
Folly::folly
${LIBFIZZ_LIBRARY}
mvfst_bufutil
mvfst_client
mvfst_codec_pktbuilder
mvfst_codec_types

View File

@@ -25,6 +25,7 @@ target_compile_options(
add_dependencies(
mvfst_state_machine
mvfst_bufutil
mvfst_constants
mvfst_codec
mvfst_codec_types
@@ -35,6 +36,7 @@ target_link_libraries(
mvfst_state_machine PUBLIC
Folly::folly
${BOOST_LIBRARIES}
mvfst_bufutil
mvfst_constants
mvfst_codec
mvfst_codec_types
@@ -97,6 +99,7 @@ target_compile_options(
add_dependencies(
mvfst_state_stream_functions
mvfst_bufutil
mvfst_codec
mvfst_codec_types
mvfst_state_machine
@@ -105,6 +108,7 @@ add_dependencies(
target_link_libraries(
mvfst_state_stream_functions PUBLIC
Folly::folly
mvfst_bufutil
mvfst_codec
mvfst_codec_types
mvfst_state_machine
@@ -232,6 +236,7 @@ target_compile_options(
add_dependencies(
mvfst_state_qpr_functions
mvfst_bufutil
mvfst_state_machine
mvfst_state_stream_functions
mvfst_codec_types
@@ -241,6 +246,7 @@ add_dependencies(
target_link_libraries(
mvfst_state_qpr_functions PUBLIC
Folly::folly
mvfst_bufutil
mvfst_state_machine
mvfst_state_stream_functions
mvfst_codec_types

View File

@@ -14,16 +14,12 @@
namespace quic {
struct StreamBuffer {
folly::IOBufQueue data;
BufQueue data;
uint64_t offset;
bool eof{false};
StreamBuffer(Buf dataIn, uint64_t offsetIn, bool eofIn = false) noexcept
: data(folly::IOBufQueue::cacheChainLength()),
offset(offsetIn),
eof(eofIn) {
data.append(std::move(dataIn));
}
: data(std::move(dataIn)), offset(offsetIn), eof(eofIn) {}
StreamBuffer(StreamBuffer&& other) = default;
StreamBuffer& operator=(StreamBuffer&& other) = default;
@@ -37,6 +33,7 @@ struct QuicStreamLike {
std::deque<StreamBuffer> readBuffer;
// List of bytes that have been written to the QUIC layer.
// TODO replace with BufQueue
folly::IOBufQueue writeBuffer{folly::IOBufQueue::cacheChainLength()};
// Stores a list of buffers which have been written to the socket and are