1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-09 10:00:57 +03:00
Files
mvfst/quic/api/QuicBatchWriter.cpp
Joseph Beshay 71b8af4b1a Add new batch writer SinglePacketBackpressureBatchWriter to retry failed writes
Summary:
The existing batch writers do not handle failed writes to the AsyncUDPSocket. A packet that fails to be written is detected as a packet loss later when feedback is received from the peer. This negatively impacts the congestion controller because of the fake loss signal, and artificially inflates the number of retransmitted packets/bytes.

This change adds a new batch writer (SinglePacketBackpressuretBatchWriter) that retains the buffers when a write fails. For subsequent writes, the writer retries the same buffer. No new packets are scheduled until the retried buffer succeeds.

Notes:
- To make sure that retry writes are scheduled, the write callback is installed on the socket when a buffer needs to be retried.
- The retries are for an already scheduled packet. The connection state reflects the timing of the first attempt. This could still have an impact on rtt samples, etc. but it this is a milder impact compared to fake losses/retranmissions.
- Any changes outside of the batch writer only impact the new batch writer. Existing batch writers do not use the fields and are not affected by the changes in this diff.

Reviewed By: kvtsoy

Differential Revision: D57597576

fbshipit-source-id: 9476d71ce52e383c5946466f64bb5eecd4f5d549
2024-05-22 15:35:32 -07:00

202 lines
4.8 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/api/QuicBatchWriter.h>
namespace quic {
// BatchWriter
bool BatchWriter::needsFlush(size_t /*unused*/) {
return false;
}
void BatchWriter::setSock(QuicAsyncUDPSocket* sock) {
if (sock && !evb_) {
fd_ = ::dup(sock->getFD());
evb_ = sock->getEventBase().get();
}
}
QuicEventBase* BatchWriter::evb() {
return evb_;
}
int BatchWriter::getAndResetFd() {
auto ret = fd_;
fd_ = -1;
return ret;
}
// SinglePacketBatchWriter
void SinglePacketBatchWriter::reset() {
buf_.reset();
}
bool SinglePacketBatchWriter::append(
std::unique_ptr<folly::IOBuf>&& buf,
size_t /*unused*/,
const folly::SocketAddress& /*unused*/,
QuicAsyncUDPSocket* /*unused*/) {
buf_ = std::move(buf);
// needs to be flushed
return true;
}
ssize_t SinglePacketBatchWriter::write(
QuicAsyncUDPSocket& sock,
const folly::SocketAddress& address) {
return sock.write(address, buf_);
}
// SinglePacketInplaceBatchWriter
void SinglePacketInplaceBatchWriter::reset() {
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
auto& buf = scopedBufAccessor.buf();
buf->clear();
}
bool SinglePacketInplaceBatchWriter::append(
std::unique_ptr<folly::IOBuf>&& /* buf */,
size_t /*unused*/,
const folly::SocketAddress& /*unused*/,
QuicAsyncUDPSocket* /*unused*/) {
// Always flush. This should trigger a write afterwards.
return true;
}
ssize_t SinglePacketInplaceBatchWriter::write(
QuicAsyncUDPSocket& sock,
const folly::SocketAddress& address) {
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
auto& buf = scopedBufAccessor.buf();
CHECK(!buf->isChained());
auto ret = sock.write(address, buf);
buf->clear();
return ret;
}
bool SinglePacketInplaceBatchWriter::empty() const {
ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
auto& buf = scopedBufAccessor.buf();
return buf->length() == 0;
}
// SinglePacketBackpressureBatchWriter
SinglePacketBackpressureBatchWriter::SinglePacketBackpressureBatchWriter(
QuicConnectionStateBase& conn)
: conn_(conn) {
// If we have a write to retry from a previous attempt, pick that up.
if (conn_.pendingWriteBatch_.buf) {
buf_.swap(conn_.pendingWriteBatch_.buf);
lastWriteSuccessful_ = false;
}
}
SinglePacketBackpressureBatchWriter::~SinglePacketBackpressureBatchWriter() {
if (buf_ && !buf_->empty()) {
conn_.pendingWriteBatch_.buf.swap(buf_);
}
}
void SinglePacketBackpressureBatchWriter::reset() {
// Only clear the buffer if it's been written successfully.
// Otherwise, retain it so it can be retried.
if (lastWriteSuccessful_) {
buf_.reset(nullptr);
}
}
bool SinglePacketBackpressureBatchWriter::append(
std::unique_ptr<folly::IOBuf>&& buf,
size_t /* unused */,
const folly::SocketAddress& /*unused*/,
QuicAsyncUDPSocket* /*unused*/) {
buf_ = std::move(buf);
// needs to be flushed
return true;
}
ssize_t SinglePacketBackpressureBatchWriter::write(
QuicAsyncUDPSocket& sock,
const folly::SocketAddress& address) {
auto written = sock.write(address, buf_);
lastWriteSuccessful_ = written > 0;
return written;
}
// SendmmsgPacketBatchWriter
SendmmsgPacketBatchWriter::SendmmsgPacketBatchWriter(size_t maxBufs)
: maxBufs_(maxBufs) {
bufs_.reserve(maxBufs);
}
bool SendmmsgPacketBatchWriter::empty() const {
return !currSize_;
}
size_t SendmmsgPacketBatchWriter::size() const {
return currSize_;
}
void SendmmsgPacketBatchWriter::reset() {
bufs_.clear();
currSize_ = 0;
}
bool SendmmsgPacketBatchWriter::append(
std::unique_ptr<folly::IOBuf>&& buf,
size_t size,
const folly::SocketAddress& /*unused*/,
QuicAsyncUDPSocket* /*unused*/) {
CHECK_LT(bufs_.size(), maxBufs_);
bufs_.emplace_back(std::move(buf));
currSize_ += size;
// reached max buffers
if (FOLLY_UNLIKELY(bufs_.size() == maxBufs_)) {
return true;
}
// does not need to be flushed yet
return false;
}
ssize_t SendmmsgPacketBatchWriter::write(
QuicAsyncUDPSocket& sock,
const folly::SocketAddress& address) {
CHECK_GT(bufs_.size(), 0);
if (bufs_.size() == 1) {
return sock.write(address, bufs_[0]);
}
int ret = sock.writem(
folly::range(&address, &address + 1), bufs_.data(), bufs_.size());
if (ret <= 0) {
return ret;
}
if (static_cast<size_t>(ret) == bufs_.size()) {
return currSize_;
}
// this is a partial write - we just need to
// return a different number than currSize_
return 0;
}
bool useSinglePacketInplaceBatchWriter(
uint32_t maxBatchSize,
quic::DataPathType dataPathType) {
return maxBatchSize == 1 &&
dataPathType == quic::DataPathType::ContinuousMemory;
}
} // namespace quic