mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-04-18 17:24:03 +03:00
Reviewed By: lnicco Differential Revision: D33587012 fbshipit-source-id: 972eb440f0156c9c04aa6e8787561b18295c1a97
573 lines
15 KiB
C++
573 lines
15 KiB
C++
/*
|
|
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* LICENSE file in the root directory of this source tree.
|
|
*/
|
|
|
|
#include <quic/api/QuicBatchWriter.h>
|
|
|
|
#if !FOLLY_MOBILE
|
|
#define USE_THREAD_LOCAL_BATCH_WRITER 1
|
|
#else
|
|
#define USE_THREAD_LOCAL_BATCH_WRITER 0
|
|
#endif
|
|
|
|
namespace {
|
|
// There is a known problem in the CloningScheduler that it may write a packet
|
|
// that's a few bytes larger than the original packet. If the original packet is
|
|
// a full packet, then the new packet will be larger than a full packet.
|
|
constexpr size_t kPacketSizeViolationTolerance = 10;
|
|
|
|
#if USE_THREAD_LOCAL_BATCH_WRITER
|
|
class ThreadLocalBatchWriterCache : public folly::AsyncTimeout {
|
|
private:
|
|
ThreadLocalBatchWriterCache() = default;
|
|
|
|
// we need to handle the case where the thread is being destroyed
|
|
// while the EventBase has an outstanding timer
|
|
struct Holder {
|
|
Holder() = default;
|
|
|
|
~Holder() {
|
|
if (ptr_) {
|
|
ptr_->decRef();
|
|
}
|
|
}
|
|
ThreadLocalBatchWriterCache* ptr_{nullptr};
|
|
};
|
|
|
|
void addRef() {
|
|
++count_;
|
|
}
|
|
|
|
void decRef() {
|
|
if (--count_ == 0) {
|
|
delete this;
|
|
}
|
|
}
|
|
|
|
public:
|
|
static ThreadLocalBatchWriterCache& getThreadLocalInstance() {
|
|
static thread_local Holder sCache;
|
|
if (!sCache.ptr_) {
|
|
sCache.ptr_ = new ThreadLocalBatchWriterCache();
|
|
}
|
|
|
|
return *sCache.ptr_;
|
|
}
|
|
|
|
void timeoutExpired() noexcept override {
|
|
timerActive_ = false;
|
|
auto& instance = getThreadLocalInstance();
|
|
if (instance.socket_ && instance.batchWriter_ &&
|
|
!instance.batchWriter_->empty()) {
|
|
// pass a default address - it is not being used by the writer
|
|
instance.batchWriter_->write(*socket_.get(), folly::SocketAddress());
|
|
instance.batchWriter_->reset();
|
|
}
|
|
decRef();
|
|
}
|
|
|
|
void enable(bool val) {
|
|
if (enabled_ != val) {
|
|
enabled_ = val;
|
|
batchingMode_ = quic::QuicBatchingMode::BATCHING_MODE_NONE;
|
|
batchWriter_.reset();
|
|
}
|
|
}
|
|
|
|
quic::BatchWriter* FOLLY_NULLABLE getCachedWriter(
|
|
quic::QuicBatchingMode mode,
|
|
const std::chrono::microseconds& threadLocalDelay) {
|
|
enabled_ = true;
|
|
threadLocalDelay_ = threadLocalDelay;
|
|
|
|
if (mode == batchingMode_) {
|
|
return batchWriter_.release();
|
|
}
|
|
|
|
batchingMode_ = mode;
|
|
batchWriter_.reset();
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
void setCachedWriter(quic::BatchWriter* writer) {
|
|
if (enabled_) {
|
|
auto* evb = writer->evb();
|
|
|
|
if (evb && !socket_) {
|
|
auto fd = writer->getAndResetFd();
|
|
if (fd >= 0) {
|
|
socket_ = std::make_unique<folly::AsyncUDPSocket>(evb);
|
|
socket_->setFD(
|
|
folly::NetworkSocket(fd),
|
|
folly::AsyncUDPSocket::FDOwnership::OWNS);
|
|
}
|
|
attachTimeoutManager(evb);
|
|
}
|
|
|
|
batchWriter_.reset(writer);
|
|
|
|
// start the timer if not active
|
|
if (evb && socket_ && !timerActive_) {
|
|
addRef();
|
|
timerActive_ = true;
|
|
evb->scheduleTimeoutHighRes(this, threadLocalDelay_);
|
|
}
|
|
} else {
|
|
delete writer;
|
|
}
|
|
}
|
|
|
|
private:
|
|
std::atomic<uint32_t> count_{1};
|
|
bool enabled_{false};
|
|
bool timerActive_{false};
|
|
std::chrono::microseconds threadLocalDelay_{1000};
|
|
quic::QuicBatchingMode batchingMode_{
|
|
quic::QuicBatchingMode::BATCHING_MODE_NONE};
|
|
// this is just an std::unique_ptr
|
|
std::unique_ptr<quic::BatchWriter> batchWriter_;
|
|
std::unique_ptr<folly::AsyncUDPSocket> socket_;
|
|
};
|
|
#endif
|
|
} // namespace
|
|
|
|
namespace quic {
|
|
// BatchWriter
|
|
bool BatchWriter::needsFlush(size_t /*unused*/) {
|
|
return false;
|
|
}
|
|
|
|
// SinglePacketBatchWriter
|
|
void SinglePacketBatchWriter::reset() {
|
|
buf_.reset();
|
|
}
|
|
|
|
bool SinglePacketBatchWriter::append(
|
|
std::unique_ptr<folly::IOBuf>&& buf,
|
|
size_t /*unused*/,
|
|
const folly::SocketAddress& /*unused*/,
|
|
folly::AsyncUDPSocket* /*unused*/) {
|
|
buf_ = std::move(buf);
|
|
|
|
// needs to be flushed
|
|
return true;
|
|
}
|
|
|
|
ssize_t SinglePacketBatchWriter::write(
|
|
folly::AsyncUDPSocket& sock,
|
|
const folly::SocketAddress& address) {
|
|
return sock.write(address, buf_);
|
|
}
|
|
|
|
// GSOPacketBatchWriter
|
|
GSOPacketBatchWriter::GSOPacketBatchWriter(size_t maxBufs)
|
|
: maxBufs_(maxBufs) {}
|
|
|
|
void GSOPacketBatchWriter::reset() {
|
|
buf_.reset(nullptr);
|
|
currBufs_ = 0;
|
|
prevSize_ = 0;
|
|
}
|
|
|
|
bool GSOPacketBatchWriter::needsFlush(size_t size) {
|
|
// if we get a buffer with a size that is greater
|
|
// than the prev one we need to flush
|
|
return (prevSize_ && (size > prevSize_));
|
|
}
|
|
|
|
bool GSOPacketBatchWriter::append(
|
|
std::unique_ptr<folly::IOBuf>&& buf,
|
|
size_t size,
|
|
const folly::SocketAddress& /*unused*/,
|
|
folly::AsyncUDPSocket* /*unused*/) {
|
|
// first buffer
|
|
if (!buf_) {
|
|
DCHECK_EQ(currBufs_, 0);
|
|
buf_ = std::move(buf);
|
|
prevSize_ = size;
|
|
currBufs_ = 1;
|
|
|
|
return false; // continue
|
|
}
|
|
|
|
// now we've got an additional buffer
|
|
// append it to the chain
|
|
buf_->prependChain(std::move(buf));
|
|
currBufs_++;
|
|
|
|
// see if we've added a different size
|
|
if (size != prevSize_) {
|
|
CHECK_LT(size, prevSize_);
|
|
return true;
|
|
}
|
|
|
|
// reached max buffers
|
|
if (FOLLY_UNLIKELY(currBufs_ == maxBufs_)) {
|
|
return true;
|
|
}
|
|
|
|
// does not need to be flushed yet
|
|
return false;
|
|
}
|
|
|
|
ssize_t GSOPacketBatchWriter::write(
|
|
folly::AsyncUDPSocket& sock,
|
|
const folly::SocketAddress& address) {
|
|
return (currBufs_ > 1)
|
|
? sock.writeGSO(address, buf_, static_cast<int>(prevSize_))
|
|
: sock.write(address, buf_);
|
|
}
|
|
|
|
GSOInplacePacketBatchWriter::GSOInplacePacketBatchWriter(
|
|
QuicConnectionStateBase& conn,
|
|
size_t maxPackets)
|
|
: conn_(conn), maxPackets_(maxPackets) {}
|
|
|
|
void GSOInplacePacketBatchWriter::reset() {
|
|
lastPacketEnd_ = nullptr;
|
|
prevSize_ = 0;
|
|
numPackets_ = 0;
|
|
nextPacketSize_ = 0;
|
|
}
|
|
|
|
bool GSOInplacePacketBatchWriter::needsFlush(size_t size) {
|
|
auto shouldFlush = prevSize_ && size > prevSize_;
|
|
if (shouldFlush) {
|
|
nextPacketSize_ = size;
|
|
}
|
|
return shouldFlush;
|
|
}
|
|
|
|
bool GSOInplacePacketBatchWriter::append(
|
|
std::unique_ptr<folly::IOBuf>&& /*buf*/,
|
|
size_t size,
|
|
const folly::SocketAddress& /* addr */,
|
|
folly::AsyncUDPSocket* /* sock */) {
|
|
CHECK(!needsFlush(size));
|
|
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
|
auto& buf = scopedBufAccessor.buf();
|
|
if (!lastPacketEnd_) {
|
|
CHECK(prevSize_ == 0 && numPackets_ == 0);
|
|
prevSize_ = size;
|
|
lastPacketEnd_ = buf->tail();
|
|
numPackets_ = 1;
|
|
return false;
|
|
}
|
|
|
|
CHECK(prevSize_ && prevSize_ >= size);
|
|
++numPackets_;
|
|
lastPacketEnd_ = buf->tail();
|
|
if (prevSize_ > size || numPackets_ == maxPackets_) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Write the buffer owned by conn_.bufAccessor to the sock, until
|
|
* lastPacketEnd_. After write, everything in the buffer after lastPacketEnd_
|
|
* will be moved to the beginning of the buffer, and buffer will be returned to
|
|
* conn_.bufAccessor.
|
|
*/
|
|
ssize_t GSOInplacePacketBatchWriter::write(
|
|
folly::AsyncUDPSocket& sock,
|
|
const folly::SocketAddress& address) {
|
|
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
|
CHECK(lastPacketEnd_);
|
|
auto& buf = scopedBufAccessor.buf();
|
|
CHECK(!buf->isChained());
|
|
CHECK(lastPacketEnd_ >= buf->data() && lastPacketEnd_ <= buf->tail())
|
|
<< "lastPacketEnd_=" << (long)lastPacketEnd_
|
|
<< " data=" << (long long)buf->data()
|
|
<< " tail=" << (long long)buf->tail();
|
|
uint64_t diffToEnd = buf->tail() - lastPacketEnd_;
|
|
CHECK(
|
|
diffToEnd <= conn_.udpSendPacketLen ||
|
|
(nextPacketSize_ && diffToEnd == nextPacketSize_))
|
|
<< "diffToEnd=" << diffToEnd << ", pktLimit=" << conn_.udpSendPacketLen
|
|
<< ", nextPacketSize_=" << nextPacketSize_;
|
|
if (diffToEnd >= conn_.udpSendPacketLen + kPacketSizeViolationTolerance) {
|
|
LOG(ERROR) << "Remaining buffer contents larger than udpSendPacketLen by "
|
|
<< (diffToEnd - conn_.udpSendPacketLen);
|
|
}
|
|
uint64_t diffToStart = lastPacketEnd_ - buf->data();
|
|
buf->trimEnd(diffToEnd);
|
|
auto bytesWritten = (numPackets_ > 1)
|
|
? sock.writeGSO(address, buf, static_cast<int>(prevSize_))
|
|
: sock.write(address, buf);
|
|
/**
|
|
* If there is one more bytes after lastPacketEnd_, that means there is a
|
|
* packet we choose not to write in this batch (e.g., it has a size larger
|
|
* than all existing packets in this batch). So after the socket write, we
|
|
* need to move that packet from the middle of the buffer to the beginning of
|
|
* the buffer so make sure we maximize the buffer space. An alternative here
|
|
* is to writem to write everything out in the previous sock write call. But
|
|
* that needs a much bigger change in the IoBufQuicBatch API.
|
|
*/
|
|
if (diffToEnd) {
|
|
buf->trimStart(diffToStart);
|
|
buf->append(diffToEnd);
|
|
buf->retreat(diffToStart);
|
|
auto bufLength = buf->length();
|
|
CHECK_EQ(diffToEnd, bufLength)
|
|
<< "diffToEnd=" << diffToEnd << ", bufLength=" << bufLength;
|
|
CHECK(
|
|
bufLength <= conn_.udpSendPacketLen ||
|
|
(nextPacketSize_ && bufLength == nextPacketSize_))
|
|
<< "bufLength=" << bufLength << ", pktLimit=" << conn_.udpSendPacketLen
|
|
<< ", nextPacketSize_=" << nextPacketSize_;
|
|
CHECK(0 == buf->headroom()) << "headroom=" << buf->headroom();
|
|
} else {
|
|
buf->clear();
|
|
}
|
|
reset();
|
|
return bytesWritten;
|
|
}
|
|
|
|
bool GSOInplacePacketBatchWriter::empty() const {
|
|
return numPackets_ == 0;
|
|
}
|
|
|
|
size_t GSOInplacePacketBatchWriter::size() const {
|
|
if (empty()) {
|
|
return 0;
|
|
}
|
|
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
|
|
CHECK(lastPacketEnd_);
|
|
auto& buf = scopedBufAccessor.buf();
|
|
CHECK(lastPacketEnd_ >= buf->data() && lastPacketEnd_ <= buf->tail());
|
|
size_t ret = lastPacketEnd_ - buf->data();
|
|
return ret;
|
|
}
|
|
|
|
// SendmmsgPacketBatchWriter
|
|
SendmmsgPacketBatchWriter::SendmmsgPacketBatchWriter(size_t maxBufs)
|
|
: maxBufs_(maxBufs) {
|
|
bufs_.reserve(maxBufs);
|
|
}
|
|
|
|
bool SendmmsgPacketBatchWriter::empty() const {
|
|
return !currSize_;
|
|
}
|
|
|
|
size_t SendmmsgPacketBatchWriter::size() const {
|
|
return currSize_;
|
|
}
|
|
|
|
void SendmmsgPacketBatchWriter::reset() {
|
|
bufs_.clear();
|
|
currSize_ = 0;
|
|
}
|
|
|
|
bool SendmmsgPacketBatchWriter::append(
|
|
std::unique_ptr<folly::IOBuf>&& buf,
|
|
size_t size,
|
|
const folly::SocketAddress& /*unused*/,
|
|
folly::AsyncUDPSocket* /*unused*/) {
|
|
CHECK_LT(bufs_.size(), maxBufs_);
|
|
bufs_.emplace_back(std::move(buf));
|
|
currSize_ += size;
|
|
|
|
// reached max buffers
|
|
if (FOLLY_UNLIKELY(bufs_.size() == maxBufs_)) {
|
|
return true;
|
|
}
|
|
|
|
// does not need to be flushed yet
|
|
return false;
|
|
}
|
|
|
|
ssize_t SendmmsgPacketBatchWriter::write(
|
|
folly::AsyncUDPSocket& sock,
|
|
const folly::SocketAddress& address) {
|
|
CHECK_GT(bufs_.size(), 0);
|
|
if (bufs_.size() == 1) {
|
|
return sock.write(address, bufs_[0]);
|
|
}
|
|
|
|
int ret = sock.writem(
|
|
folly::range(&address, &address + 1), bufs_.data(), bufs_.size());
|
|
|
|
if (ret <= 0) {
|
|
return ret;
|
|
}
|
|
|
|
if (static_cast<size_t>(ret) == bufs_.size()) {
|
|
return currSize_;
|
|
}
|
|
|
|
// this is a partial write - we just need to
|
|
// return a different number than currSize_
|
|
return 0;
|
|
}
|
|
|
|
// SendmmsgGSOPacketBatchWriter
|
|
SendmmsgGSOPacketBatchWriter::SendmmsgGSOPacketBatchWriter(size_t maxBufs)
|
|
: maxBufs_(maxBufs) {
|
|
bufs_.reserve(maxBufs);
|
|
}
|
|
|
|
bool SendmmsgGSOPacketBatchWriter::empty() const {
|
|
return !currSize_;
|
|
}
|
|
|
|
size_t SendmmsgGSOPacketBatchWriter::size() const {
|
|
return currSize_;
|
|
}
|
|
|
|
void SendmmsgGSOPacketBatchWriter::reset() {
|
|
bufs_.clear();
|
|
gso_.clear();
|
|
prevSize_.clear();
|
|
addrs_.clear();
|
|
addrMap_.clear();
|
|
|
|
currBufs_ = 0;
|
|
currSize_ = 0;
|
|
}
|
|
|
|
bool SendmmsgGSOPacketBatchWriter::append(
|
|
std::unique_ptr<folly::IOBuf>&& buf,
|
|
size_t size,
|
|
const folly::SocketAddress& addr,
|
|
folly::AsyncUDPSocket* sock) {
|
|
setSock(sock);
|
|
currSize_ += size;
|
|
|
|
// insert the entry if not present
|
|
auto& idx = addrMap_[addr];
|
|
|
|
// try to see if we can append
|
|
if (idx.valid()) {
|
|
if (size <= prevSize_[idx]) {
|
|
if ((gso_[idx] == 0) ||
|
|
(static_cast<size_t>(gso_[idx]) == prevSize_[idx])) {
|
|
// we can append
|
|
gso_[idx] = prevSize_[idx];
|
|
prevSize_[idx] = size;
|
|
bufs_[idx]->prependChain(std::move(buf));
|
|
currBufs_++;
|
|
|
|
// flush if we reach maxBufs_
|
|
return (currBufs_ == maxBufs_);
|
|
}
|
|
}
|
|
}
|
|
|
|
// set the map index
|
|
idx = bufs_.size();
|
|
|
|
// add a new buffer
|
|
bufs_.emplace_back(std::move(buf));
|
|
|
|
// set the gso_ value to 0 for now
|
|
// this will change if we append to this chain
|
|
gso_.emplace_back(0);
|
|
prevSize_.emplace_back(size);
|
|
addrs_.emplace_back(addr);
|
|
|
|
currBufs_++;
|
|
|
|
// flush if we reach maxBufs_
|
|
return (currBufs_ == maxBufs_);
|
|
}
|
|
|
|
ssize_t SendmmsgGSOPacketBatchWriter::write(
|
|
folly::AsyncUDPSocket& sock,
|
|
const folly::SocketAddress& /*unused*/) {
|
|
CHECK_GT(bufs_.size(), 0);
|
|
if (bufs_.size() == 1) {
|
|
return (currBufs_ > 1) ? sock.writeGSO(addrs_[0], bufs_[0], gso_[0])
|
|
: sock.write(addrs_[0], bufs_[0]);
|
|
}
|
|
|
|
int ret = sock.writemGSO(
|
|
folly::range(addrs_.data(), addrs_.data() + addrs_.size()),
|
|
bufs_.data(),
|
|
bufs_.size(),
|
|
gso_.data());
|
|
|
|
if (ret <= 0) {
|
|
return ret;
|
|
}
|
|
|
|
if (static_cast<size_t>(ret) == bufs_.size()) {
|
|
return currSize_;
|
|
}
|
|
|
|
// this is a partial write - we just need to
|
|
// return a different number than currSize_
|
|
return 0;
|
|
}
|
|
|
|
// BatchWriterDeleter
|
|
void BatchWriterDeleter::operator()(BatchWriter* batchWriter) {
|
|
#if USE_THREAD_LOCAL_BATCH_WRITER
|
|
ThreadLocalBatchWriterCache::getThreadLocalInstance().setCachedWriter(
|
|
batchWriter);
|
|
#else
|
|
delete batchWriter;
|
|
#endif
|
|
}
|
|
|
|
// BatchWriterFactory
|
|
BatchWriterPtr BatchWriterFactory::makeBatchWriter(
|
|
folly::AsyncUDPSocket& sock,
|
|
const quic::QuicBatchingMode& batchingMode,
|
|
uint32_t batchSize,
|
|
bool useThreadLocal,
|
|
const std::chrono::microseconds& threadLocalDelay,
|
|
DataPathType dataPathType,
|
|
QuicConnectionStateBase& conn) {
|
|
#if USE_THREAD_LOCAL_BATCH_WRITER
|
|
if (useThreadLocal &&
|
|
(batchingMode == quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO) &&
|
|
sock.getGSO() >= 0) {
|
|
BatchWriterPtr ret(
|
|
ThreadLocalBatchWriterCache::getThreadLocalInstance().getCachedWriter(
|
|
batchingMode, threadLocalDelay));
|
|
|
|
if (ret) {
|
|
return ret;
|
|
}
|
|
} else {
|
|
ThreadLocalBatchWriterCache::getThreadLocalInstance().enable(false);
|
|
}
|
|
#else
|
|
(void)useThreadLocal;
|
|
#endif
|
|
|
|
switch (batchingMode) {
|
|
case quic::QuicBatchingMode::BATCHING_MODE_NONE:
|
|
return BatchWriterPtr(new SinglePacketBatchWriter());
|
|
case quic::QuicBatchingMode::BATCHING_MODE_GSO: {
|
|
if (sock.getGSO() >= 0) {
|
|
if (dataPathType == DataPathType::ChainedMemory) {
|
|
return BatchWriterPtr(new GSOPacketBatchWriter(batchSize));
|
|
}
|
|
return BatchWriterPtr(new GSOInplacePacketBatchWriter(conn, batchSize));
|
|
}
|
|
|
|
return BatchWriterPtr(new SinglePacketBatchWriter());
|
|
}
|
|
case quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG:
|
|
return BatchWriterPtr(new SendmmsgPacketBatchWriter(batchSize));
|
|
case quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG_GSO: {
|
|
if (sock.getGSO() >= 0) {
|
|
return BatchWriterPtr(new SendmmsgGSOPacketBatchWriter(batchSize));
|
|
}
|
|
|
|
return BatchWriterPtr(new SendmmsgPacketBatchWriter(batchSize));
|
|
}
|
|
// no default so we can catch missing case at compile time
|
|
}
|
|
|
|
folly::assume_unreachable();
|
|
}
|
|
|
|
} // namespace quic
|