1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-07 22:46:22 +03:00
Files
mvfst/quic/api/QuicTransportBase.cpp
Aman Sharma 1cc00f77a4 Move core functionality to QuicTransportBaseLite [24/n]
Summary:
There are many files in this diff, but the relevant ones are:
* quic/api/QuicSocket.h
* quic/api/QuicSocketLite.h
* quic/api/QuicTransportBase.h
* quic/api/QuicTransportBaseLite.cpp
* quic/observer/SocketObserverContainer.h

The purpose of this is to include the observer functionality within the Lite class. Ideally, we'd remove it from the Lite class, but that's going to require some more detailed changes to the interfaces we're using, because observers are used quite a lot within common code (for example, AckHandlers.cpp).

I'll make those changes some time after the QuicTransportBase split, as they're going to take a while.

Reviewed By: jbeshay, mjoras, kvtsoy

Differential Revision: D65685667

fbshipit-source-id: feb07bcf35d6af2e5c2b538ff463b01b67c6aff9
2024-11-14 13:02:10 -08:00

899 lines
30 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/api/QuicTransportBase.h>
#include <folly/Chrono.h>
#include <folly/ScopeGuard.h>
#include <quic/api/LoopDetectorCallback.h>
#include <quic/api/QuicBatchWriterFactory.h>
#include <quic/api/QuicTransportFunctions.h>
#include <quic/common/Optional.h>
#include <quic/common/TimeUtil.h>
#include <quic/congestion_control/Pacer.h>
#include <quic/logging/QLoggerConstants.h>
#include <quic/loss/QuicLossFunctions.h>
#include <quic/state/QuicPacingFunctions.h>
#include <quic/state/QuicStateFunctions.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/SimpleFrameFunctions.h>
#include <memory>
namespace {
/**
* Helper function - if given error is not set, returns a generic app error.
* Used by close() and closeNow().
*/
constexpr auto APP_NO_ERROR = quic::GenericApplicationErrorCode::NO_ERROR;
} // namespace
namespace quic {
QuicTransportBase::QuicTransportBase(
std::shared_ptr<QuicEventBase> evb,
std::unique_ptr<QuicAsyncUDPSocket> socket,
bool useConnectionEndWithErrorCallback)
: QuicTransportBaseLite(
std::move(evb),
std::move(socket),
useConnectionEndWithErrorCallback) {
if (socket_) {
folly::Function<Optional<folly::SocketCmsgMap>()> func = [&]() {
return getAdditionalCmsgsForAsyncUDPSocket();
};
socket_->setAdditionalCmsgsFunc(std::move(func));
}
}
void QuicTransportBase::setPacingTimer(
QuicTimer::SharedPtr pacingTimer) noexcept {
if (pacingTimer) {
writeLooper_->setPacingTimer(std::move(pacingTimer));
}
}
const std::shared_ptr<QLogger> QuicTransportBase::getQLogger() const {
return conn_->qLogger;
}
void QuicTransportBase::setQLogger(std::shared_ptr<QLogger> qLogger) {
// setQLogger can be called multiple times for the same connection and with
// the same qLogger we track the number of times it gets set and the number
// of times it gets reset, and only stop qlog collection when the number of
// resets equals the number of times the logger was set
if (!conn_->qLogger) {
CHECK_EQ(qlogRefcnt_, 0);
} else {
CHECK_GT(qlogRefcnt_, 0);
}
if (qLogger) {
conn_->qLogger = std::move(qLogger);
conn_->qLogger->setDcid(conn_->clientChosenDestConnectionId);
if (conn_->nodeType == QuicNodeType::Server) {
conn_->qLogger->setScid(conn_->serverConnectionId);
} else {
conn_->qLogger->setScid(conn_->clientConnectionId);
}
qlogRefcnt_++;
} else {
if (conn_->qLogger) {
qlogRefcnt_--;
if (qlogRefcnt_ == 0) {
conn_->qLogger = nullptr;
}
}
}
}
Optional<ConnectionId> QuicTransportBase::getClientConnectionId() const {
return conn_->clientConnectionId;
}
Optional<ConnectionId> QuicTransportBase::getServerConnectionId() const {
return conn_->serverConnectionId;
}
Optional<ConnectionId> QuicTransportBase::getClientChosenDestConnectionId()
const {
return conn_->clientChosenDestConnectionId;
}
QuicTransportBase::~QuicTransportBase() {
resetConnectionCallbacks();
// Just in case this ended up hanging around.
cancelTimeout(&drainTimeout_);
// closeImpl and closeUdpSocket should have been triggered by destructor of
// derived class to ensure that observers are properly notified
DCHECK_NE(CloseState::OPEN, closeState_);
DCHECK(!socket_.get()); // should be no socket
}
bool QuicTransportBase::replaySafe() const {
return (conn_->oneRttWriteCipher != nullptr);
}
void QuicTransportBase::closeGracefully() {
if (closeState_ == CloseState::CLOSED ||
closeState_ == CloseState::GRACEFUL_CLOSING) {
return;
}
[[maybe_unused]] auto self = sharedGuard();
resetConnectionCallbacks();
closeState_ = CloseState::GRACEFUL_CLOSING;
updatePacingOnClose(*conn_);
if (conn_->qLogger) {
conn_->qLogger->addConnectionClose(kNoError, kGracefulExit, true, false);
}
// Stop reads and cancel all the app callbacks.
VLOG(10) << "Stopping read and peek loopers due to graceful close " << *this;
readLooper_->stop();
peekLooper_->stop();
cancelAllAppCallbacks(
QuicError(QuicErrorCode(LocalErrorCode::NO_ERROR), "Graceful Close"));
// All streams are closed, close the transport for realz.
if (conn_->streamManager->streamCount() == 0) {
closeImpl(none);
}
}
folly::Expected<size_t, LocalErrorCode> QuicTransportBase::getStreamReadOffset(
StreamId) const {
return 0;
}
folly::Expected<size_t, LocalErrorCode> QuicTransportBase::getStreamWriteOffset(
StreamId id) const {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
try {
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
return stream->currentWriteOffset;
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(ex.errorCode());
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
folly::Expected<size_t, LocalErrorCode>
QuicTransportBase::getStreamWriteBufferedBytes(StreamId id) const {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
try {
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
return stream->pendingWrites.chainLength();
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(ex.errorCode());
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
folly::Expected<QuicSocket::FlowControlState, LocalErrorCode>
QuicTransportBase::getConnectionFlowControl() const {
return QuicSocket::FlowControlState(
getSendConnFlowControlBytesAPI(*conn_),
conn_->flowControlState.peerAdvertisedMaxOffset,
getRecvConnFlowControlBytes(*conn_),
conn_->flowControlState.advertisedMaxOffset);
}
folly::Expected<uint64_t, LocalErrorCode>
QuicTransportBase::getMaxWritableOnStream(StreamId id) const {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
return maxWritableOnStream(*stream);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setConnectionFlowControlWindow(uint64_t windowSize) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
conn_->flowControlState.windowSize = windowSize;
maybeSendConnWindowUpdate(*conn_, Clock::now());
updateWriteLooper(true);
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamFlowControlWindow(
StreamId id,
uint64_t windowSize) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
stream->flowControlState.windowSize = windowSize;
maybeSendStreamWindowUpdate(*stream, Clock::now());
updateWriteLooper(true);
return folly::unit;
}
void QuicTransportBase::unsetAllReadCallbacks() {
for (const auto& [id, _] : readCallbacks_) {
setReadCallbackInternal(id, nullptr, APP_NO_ERROR);
}
}
void QuicTransportBase::unsetAllPeekCallbacks() {
for (const auto& [id, _] : peekCallbacks_) {
setPeekCallbackInternal(id, nullptr);
}
}
void QuicTransportBase::unsetAllDeliveryCallbacks() {
auto deliveryCallbacksCopy = deliveryCallbacks_;
for (const auto& [id, _] : deliveryCallbacksCopy) {
cancelDeliveryCallbacksForStream(id);
}
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::pauseRead(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumeRead(id, false);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::resumeRead(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumeRead(id, true);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::pauseOrResumeRead(StreamId id, bool resume) {
if (isSendingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto readCb = readCallbacks_.find(id);
if (readCb == readCallbacks_.end()) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (readCb->second.resumed != resume) {
readCb->second.resumed = resume;
updateReadLooper();
}
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::setPeekCallback(
StreamId id,
PeekCallback* cb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
setPeekCallbackInternal(id, cb);
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setPeekCallbackInternal(
StreamId id,
PeekCallback* cb) noexcept {
VLOG(4) << "Setting setPeekCallback for stream=" << id << " cb=" << cb << " "
<< *this;
auto peekCbIt = peekCallbacks_.find(id);
if (peekCbIt == peekCallbacks_.end()) {
// Don't allow initial setting of a nullptr callback.
if (!cb) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
peekCbIt = peekCallbacks_.emplace(id, PeekCallbackData(cb)).first;
}
if (!cb) {
VLOG(10) << "Resetting the peek callback to nullptr " << "stream=" << id
<< " peekCb=" << peekCbIt->second.peekCb;
}
peekCbIt->second.peekCb = cb;
updatePeekLooper();
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::pausePeek(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumePeek(id, false);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::resumePeek(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumePeek(id, true);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::pauseOrResumePeek(StreamId id, bool resume) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto peekCb = peekCallbacks_.find(id);
if (peekCb == peekCallbacks_.end()) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (peekCb->second.resumed != resume) {
peekCb->second.resumed = resume;
updatePeekLooper();
}
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::peek(
StreamId id,
const folly::Function<void(StreamId id, const folly::Range<PeekIterator>&)
const>& peekCallback) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
updatePeekLooper();
updateWriteLooper(true);
};
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
if (stream->streamReadError) {
switch (stream->streamReadError->type()) {
case QuicErrorCode::Type::LocalErrorCode:
return folly::makeUnexpected(
*stream->streamReadError->asLocalErrorCode());
default:
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
peekDataFromQuicStream(*stream, std::move(peekCallback));
return folly::makeExpected<LocalErrorCode>(folly::Unit());
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::consume(
StreamId id,
size_t amount) {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
auto result = consume(id, stream->currentReadOffset, amount);
if (result.hasError()) {
return folly::makeUnexpected(result.error().first);
}
return folly::makeExpected<LocalErrorCode>(result.value());
}
folly::Expected<folly::Unit, std::pair<LocalErrorCode, Optional<uint64_t>>>
QuicTransportBase::consume(StreamId id, uint64_t offset, size_t amount) {
using ConsumeError = std::pair<LocalErrorCode, Optional<uint64_t>>;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::CONNECTION_CLOSED, none});
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
updatePeekLooper();
updateReadLooper(); // consume may affect "read" API
updateWriteLooper(true);
};
Optional<uint64_t> readOffset;
try {
// Need to check that the stream exists first so that we don't
// accidentally let the API create a peer stream that was not
// sent by the peer.
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::STREAM_NOT_EXISTS, readOffset});
}
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
readOffset = stream->currentReadOffset;
if (stream->currentReadOffset != offset) {
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::INTERNAL_ERROR, readOffset});
}
if (stream->streamReadError) {
switch (stream->streamReadError->type()) {
case QuicErrorCode::Type::LocalErrorCode:
return folly::makeUnexpected(
ConsumeError{*stream->streamReadError->asLocalErrorCode(), none});
default:
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::INTERNAL_ERROR, none});
}
}
consumeDataFromQuicStream(*stream, amount);
return folly::makeExpected<ConsumeError>(folly::Unit());
} catch (const QuicTransportException& ex) {
VLOG(4) << "consume() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("consume() error")));
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::TRANSPORT_ERROR, readOffset});
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("consume() error")));
return folly::makeUnexpected(ConsumeError{ex.errorCode(), readOffset});
} catch (const std::exception& ex) {
VLOG(4) << "consume() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("consume() error")));
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::INTERNAL_ERROR, readOffset});
}
}
folly::Expected<StreamGroupId, LocalErrorCode>
QuicTransportBase::createBidirectionalStreamGroup() {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
return conn_->streamManager->createNextBidirectionalStreamGroup();
}
folly::Expected<StreamGroupId, LocalErrorCode>
QuicTransportBase::createUnidirectionalStreamGroup() {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
return conn_->streamManager->createNextUnidirectionalStreamGroup();
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createBidirectionalStreamInGroup(StreamGroupId groupId) {
return createStreamInternal(true, groupId);
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createUnidirectionalStreamInGroup(StreamGroupId groupId) {
return createStreamInternal(false, groupId);
}
bool QuicTransportBase::isClientStream(StreamId stream) noexcept {
return quic::isClientStream(stream);
}
bool QuicTransportBase::isServerStream(StreamId stream) noexcept {
return quic::isServerStream(stream);
}
StreamDirectionality QuicTransportBase::getStreamDirectionality(
StreamId stream) noexcept {
return quic::getStreamDirectionality(stream);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::registerTxCallback(
StreamId id,
uint64_t offset,
ByteEventCallback* cb) {
return registerByteEventCallback(ByteEvent::Type::TX, id, offset, cb);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::setPingCallback(
PingCallback* cb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
VLOG(4) << "Setting ping callback " << " cb=" << cb << " " << *this;
pingCallback_ = cb;
return folly::unit;
}
void QuicTransportBase::sendPing(std::chrono::milliseconds pingTimeout) {
/* Step 0: Connection should not be closed */
if (closeState_ == CloseState::CLOSED) {
return;
}
// Step 1: Send a simple ping frame
conn_->pendingEvents.sendPing = true;
updateWriteLooper(true);
// Step 2: Schedule the timeout on event base
if (pingCallback_ && pingTimeout != 0ms) {
schedulePingTimeout(pingCallback_, pingTimeout);
}
}
void QuicTransportBase::schedulePingTimeout(
PingCallback* pingCb,
std::chrono::milliseconds timeout) {
// if a ping timeout is already scheduled, nothing to do, return
if (isTimeoutScheduled(&pingTimeout_)) {
return;
}
pingCallback_ = pingCb;
scheduleTimeout(&pingTimeout_, timeout);
}
void QuicTransportBase::setAckRxTimestampsEnabled(bool enableAckRxTimestamps) {
if (!enableAckRxTimestamps) {
conn_->transportSettings.maybeAckReceiveTimestampsConfigSentToPeer.clear();
}
}
void QuicTransportBase::setEarlyDataAppParamsFunctions(
folly::Function<bool(const Optional<std::string>&, const Buf&) const>
validator,
folly::Function<Buf()> getter) {
conn_->earlyDataAppParamsValidator = std::move(validator);
conn_->earlyDataAppParamsGetter = std::move(getter);
}
void QuicTransportBase::resetNonControlStreams(
ApplicationErrorCode error,
folly::StringPiece errorMsg) {
std::vector<StreamId> nonControlStreamIds;
nonControlStreamIds.reserve(conn_->streamManager->streamCount());
conn_->streamManager->streamStateForEach(
[&nonControlStreamIds](const auto& stream) {
if (!stream.isControl) {
nonControlStreamIds.push_back(stream.id);
}
});
for (auto id : nonControlStreamIds) {
if (isSendingStream(conn_->nodeType, id) || isBidirectionalStream(id)) {
auto writeCallbackIt = pendingWriteCallbacks_.find(id);
if (writeCallbackIt != pendingWriteCallbacks_.end()) {
writeCallbackIt->second->onStreamWriteError(
id, QuicError(error, errorMsg.str()));
}
resetStream(id, error);
}
if (isReceivingStream(conn_->nodeType, id) || isBidirectionalStream(id)) {
auto readCallbackIt = readCallbacks_.find(id);
if (readCallbackIt != readCallbacks_.end() &&
readCallbackIt->second.readCb) {
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
if (!stream->groupId) {
readCallbackIt->second.readCb->readError(
id, QuicError(error, errorMsg.str()));
} else {
readCallbackIt->second.readCb->readErrorWithGroup(
id, *stream->groupId, QuicError(error, errorMsg.str()));
}
}
peekCallbacks_.erase(id);
stopSending(id, error);
}
}
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setDatagramCallback(DatagramCallback* cb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
VLOG(4) << "Setting datagram callback " << " cb=" << cb << " " << *this;
datagramCallback_ = cb;
updateReadLooper();
return folly::unit;
}
uint16_t QuicTransportBase::getDatagramSizeLimit() const {
CHECK(conn_);
auto maxDatagramPacketSize = std::min<decltype(conn_->udpSendPacketLen)>(
conn_->datagramState.maxWriteFrameSize, conn_->udpSendPacketLen);
return std::max<decltype(maxDatagramPacketSize)>(
0, maxDatagramPacketSize - kMaxDatagramPacketOverhead);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::writeDatagram(
Buf buf) {
// TODO(lniccolini) update max datagram frame size
// https://github.com/quicwg/datagram/issues/3
// For now, max_datagram_size > 0 means the peer supports datagram frames
if (conn_->datagramState.maxWriteFrameSize == 0) {
QUIC_STATS(conn_->statsCallback, onDatagramDroppedOnWrite);
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_DATA);
}
if (conn_->datagramState.writeBuffer.size() >=
conn_->datagramState.maxWriteBufferSize) {
QUIC_STATS(conn_->statsCallback, onDatagramDroppedOnWrite);
if (!conn_->transportSettings.datagramConfig.sendDropOldDataFirst) {
// TODO(lniccolini) use different return codes to signal the application
// exactly why the datagram got dropped
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_DATA);
} else {
conn_->datagramState.writeBuffer.pop_front();
}
}
conn_->datagramState.writeBuffer.emplace_back(std::move(buf));
updateWriteLooper(true);
return folly::unit;
}
folly::Expected<std::vector<ReadDatagram>, LocalErrorCode>
QuicTransportBase::readDatagrams(size_t atMost) {
CHECK(conn_);
auto datagrams = &conn_->datagramState.readBuffer;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (atMost == 0) {
atMost = datagrams->size();
} else {
atMost = std::min(atMost, datagrams->size());
}
std::vector<ReadDatagram> retDatagrams;
retDatagrams.reserve(atMost);
std::transform(
datagrams->begin(),
datagrams->begin() + atMost,
std::back_inserter(retDatagrams),
[](ReadDatagram& dg) { return std::move(dg); });
datagrams->erase(datagrams->begin(), datagrams->begin() + atMost);
return retDatagrams;
}
folly::Expected<std::vector<Buf>, LocalErrorCode>
QuicTransportBase::readDatagramBufs(size_t atMost) {
CHECK(conn_);
auto datagrams = &conn_->datagramState.readBuffer;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (atMost == 0) {
atMost = datagrams->size();
} else {
atMost = std::min(atMost, datagrams->size());
}
std::vector<Buf> retDatagrams;
retDatagrams.reserve(atMost);
std::transform(
datagrams->begin(),
datagrams->begin() + atMost,
std::back_inserter(retDatagrams),
[](ReadDatagram& dg) { return dg.bufQueue().move(); });
datagrams->erase(datagrams->begin(), datagrams->begin() + atMost);
return retDatagrams;
}
folly::Expected<Priority, LocalErrorCode> QuicTransportBase::getStreamPriority(
StreamId id) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (auto stream = conn_->streamManager->findStream(id)) {
return stream->priority;
}
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
bool QuicTransportBase::isDetachable() {
// only the client is detachable.
return conn_->nodeType == QuicNodeType::Client;
}
void QuicTransportBase::attachEventBase(std::shared_ptr<QuicEventBase> evbIn) {
VLOG(10) << __func__ << " " << *this;
DCHECK(!getEventBase());
DCHECK(evbIn && evbIn->isInEventBaseThread());
evb_ = std::move(evbIn);
if (socket_) {
socket_->attachEventBase(evb_);
}
scheduleAckTimeout();
schedulePathValidationTimeout();
setIdleTimer();
readLooper_->attachEventBase(evb_);
peekLooper_->attachEventBase(evb_);
writeLooper_->attachEventBase(evb_);
updateReadLooper();
updatePeekLooper();
updateWriteLooper(false);
#ifndef MVFST_USE_LIBEV
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::evbEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<SocketObserverInterface::Events::evbEvents>(
[this](auto observer, auto observed) {
observer->evbAttach(observed, evb_.get());
});
}
#endif
}
void QuicTransportBase::detachEventBase() {
VLOG(10) << __func__ << " " << *this;
DCHECK(getEventBase() && getEventBase()->isInEventBaseThread());
if (socket_) {
socket_->detachEventBase();
}
connWriteCallback_ = nullptr;
pendingWriteCallbacks_.clear();
cancelTimeout(&lossTimeout_);
cancelTimeout(&ackTimeout_);
cancelTimeout(&pathValidationTimeout_);
cancelTimeout(&idleTimeout_);
cancelTimeout(&keepaliveTimeout_);
cancelTimeout(&drainTimeout_);
readLooper_->detachEventBase();
peekLooper_->detachEventBase();
writeLooper_->detachEventBase();
#ifndef MVFST_USE_LIBEV
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::evbEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<SocketObserverInterface::Events::evbEvents>(
[this](auto observer, auto observed) {
observer->evbDetach(observed, evb_.get());
});
}
#endif
evb_ = nullptr;
}
inline std::ostream& operator<<(
std::ostream& os,
const CloseState& closeState) {
switch (closeState) {
case CloseState::OPEN:
os << "OPEN";
break;
case CloseState::GRACEFUL_CLOSING:
os << "GRACEFUL_CLOSING";
break;
case CloseState::CLOSED:
os << "CLOSED";
break;
}
return os;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::maybeResetStreamFromReadError(
StreamId id,
QuicErrorCode error) {
if (quic::ApplicationErrorCode* code = error.asApplicationErrorCode()) {
return resetStream(id, *code);
}
return folly::Expected<folly::Unit, LocalErrorCode>(folly::unit);
}
void QuicTransportBase::setCmsgs(const folly::SocketCmsgMap& options) {
socket_->setCmsgs(options);
}
void QuicTransportBase::appendCmsgs(const folly::SocketCmsgMap& options) {
socket_->appendCmsgs(options);
}
void QuicTransportBase::setBackgroundModeParameters(
PriorityLevel maxBackgroundPriority,
float backgroundUtilizationFactor) {
backgroundPriorityThreshold_.assign(maxBackgroundPriority);
backgroundUtilizationFactor_.assign(backgroundUtilizationFactor);
conn_->streamManager->setPriorityChangesObserver(this);
onStreamPrioritiesChange();
}
void QuicTransportBase::clearBackgroundModeParameters() {
backgroundPriorityThreshold_.clear();
backgroundUtilizationFactor_.clear();
conn_->streamManager->resetPriorityChangesObserver();
onStreamPrioritiesChange();
}
// If backgroundPriorityThreshold_ and backgroundUtilizationFactor_ are set
// and all streams have equal or lower priority than the threshold (value >=
// threshold), set the connection's congestion controller to use background
// mode with the set utilization factor. In all other cases, turn off the
// congestion controller's background mode.
void QuicTransportBase::onStreamPrioritiesChange() {
if (conn_->congestionController == nullptr) {
return;
}
if (!backgroundPriorityThreshold_.hasValue() ||
!backgroundUtilizationFactor_.hasValue()) {
conn_->congestionController->setBandwidthUtilizationFactor(1.0);
return;
}
bool allStreamsBackground = conn_->streamManager->getHighestPriorityLevel() >=
backgroundPriorityThreshold_.value();
float targetUtilization =
allStreamsBackground ? backgroundUtilizationFactor_.value() : 1.0f;
VLOG(10) << fmt::format(
"Updating transport background mode. Highest Priority={} Threshold={} TargetUtilization={}",
conn_->streamManager->getHighestPriorityLevel(),
backgroundPriorityThreshold_.value(),
targetUtilization);
conn_->congestionController->setBandwidthUtilizationFactor(targetUtilization);
}
bool QuicTransportBase::checkCustomRetransmissionProfilesEnabled() const {
return quic::checkCustomRetransmissionProfilesEnabled(*conn_);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamGroupRetransmissionPolicy(
StreamGroupId groupId,
std::optional<QuicStreamGroupRetransmissionPolicy> policy) noexcept {
// Reset the policy to default one.
if (policy == std::nullopt) {
conn_->retransmissionPolicies.erase(groupId);
return folly::unit;
}
if (!checkCustomRetransmissionProfilesEnabled()) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (conn_->retransmissionPolicies.size() >=
conn_->transportSettings.advertisedMaxStreamGroups) {
return folly::makeUnexpected(LocalErrorCode::RTX_POLICIES_LIMIT_EXCEEDED);
}
conn_->retransmissionPolicies.emplace(groupId, *policy);
return folly::unit;
}
} // namespace quic