1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-04-18 17:24:03 +03:00
mvfst/quic/api/QuicTransportBase.cpp
Alan Frindell 444a0f261b Use new PriorityQueue interface
Summary:
Migrating mvfst priority API to be abstract, based on new classes is quic/priority.  For now, it requires applications use `HTTPPriorityQueue::Priority`, to be compatible with the hardcoded `deprecated::PriorityQueue` implementation and apps cannot yet change the queue impl.  Eventually the application will have full control of the queue.

There are minor functional changes in this diff:

1. Priority QLog types changed from int/bool to string
2. Any PAUSED stream has priority `u=7,i` if paused streams are disabled (previously explicitly settable to any priority)

Reviewed By: jbeshay

Differential Revision: D68696110

fbshipit-source-id: 5a4721b08248ac75d725f51b5cb3e5d5de206d86
2025-04-09 13:54:27 -07:00

829 lines
27 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/api/QuicTransportBase.h>
#include <folly/Chrono.h>
#include <folly/ScopeGuard.h>
#include <quic/api/LoopDetectorCallback.h>
#include <quic/api/QuicBatchWriterFactory.h>
#include <quic/api/QuicTransportFunctions.h>
#include <quic/common/Optional.h>
#include <quic/common/TimeUtil.h>
#include <quic/congestion_control/Pacer.h>
#include <quic/logging/QLoggerConstants.h>
#include <quic/loss/QuicLossFunctions.h>
#include <quic/state/QuicPacingFunctions.h>
#include <quic/state/QuicStateFunctions.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/SimpleFrameFunctions.h>
#include <memory>
namespace {
/**
* Helper function - if given error is not set, returns a generic app error.
* Used by close() and closeNow().
*/
constexpr auto APP_NO_ERROR = quic::GenericApplicationErrorCode::NO_ERROR;
} // namespace
namespace quic {
QuicTransportBase::QuicTransportBase(
std::shared_ptr<QuicEventBase> evb,
std::unique_ptr<QuicAsyncUDPSocket> socket,
bool useConnectionEndWithErrorCallback)
: QuicTransportBaseLite(
std::move(evb),
std::move(socket),
useConnectionEndWithErrorCallback) {
if (socket_) {
folly::Function<Optional<folly::SocketCmsgMap>()> func = [&]() {
return getAdditionalCmsgsForAsyncUDPSocket();
};
socket_->setAdditionalCmsgsFunc(std::move(func));
}
}
void QuicTransportBase::setPacingTimer(
QuicTimer::SharedPtr pacingTimer) noexcept {
if (pacingTimer) {
writeLooper_->setPacingTimer(std::move(pacingTimer));
}
}
Optional<ConnectionId> QuicTransportBase::getClientConnectionId() const {
return conn_->clientConnectionId;
}
Optional<ConnectionId> QuicTransportBase::getServerConnectionId() const {
return conn_->serverConnectionId;
}
Optional<ConnectionId> QuicTransportBase::getClientChosenDestConnectionId()
const {
return conn_->clientChosenDestConnectionId;
}
QuicTransportBase::~QuicTransportBase() {
resetConnectionCallbacks();
// Just in case this ended up hanging around.
cancelTimeout(&drainTimeout_);
// closeImpl and closeUdpSocket should have been triggered by destructor of
// derived class to ensure that observers are properly notified
DCHECK_NE(CloseState::OPEN, closeState_);
DCHECK(!socket_.get()); // should be no socket
}
bool QuicTransportBase::replaySafe() const {
return (conn_->oneRttWriteCipher != nullptr);
}
void QuicTransportBase::closeGracefully() {
if (closeState_ == CloseState::CLOSED ||
closeState_ == CloseState::GRACEFUL_CLOSING) {
return;
}
[[maybe_unused]] auto self = sharedGuard();
resetConnectionCallbacks();
closeState_ = CloseState::GRACEFUL_CLOSING;
updatePacingOnClose(*conn_);
if (conn_->qLogger) {
conn_->qLogger->addConnectionClose(kNoError, kGracefulExit, true, false);
}
// Stop reads and cancel all the app callbacks.
VLOG(10) << "Stopping read and peek loopers due to graceful close " << *this;
readLooper_->stop();
peekLooper_->stop();
cancelAllAppCallbacks(
QuicError(QuicErrorCode(LocalErrorCode::NO_ERROR), "Graceful Close"));
// All streams are closed, close the transport for realz.
if (conn_->streamManager->streamCount() == 0) {
closeImpl(none);
}
}
folly::Expected<size_t, LocalErrorCode> QuicTransportBase::getStreamReadOffset(
StreamId) const {
return 0;
}
folly::Expected<size_t, LocalErrorCode> QuicTransportBase::getStreamWriteOffset(
StreamId id) const {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
try {
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
return stream->currentWriteOffset;
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(ex.errorCode());
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
folly::Expected<size_t, LocalErrorCode>
QuicTransportBase::getStreamWriteBufferedBytes(StreamId id) const {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
try {
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
return stream->pendingWrites.chainLength();
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(ex.errorCode());
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
folly::Expected<QuicSocket::FlowControlState, LocalErrorCode>
QuicTransportBase::getConnectionFlowControl() const {
return QuicSocket::FlowControlState(
getSendConnFlowControlBytesAPI(*conn_),
conn_->flowControlState.peerAdvertisedMaxOffset,
getRecvConnFlowControlBytes(*conn_),
conn_->flowControlState.advertisedMaxOffset);
}
folly::Expected<uint64_t, LocalErrorCode>
QuicTransportBase::getMaxWritableOnStream(StreamId id) const {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
return maxWritableOnStream(*stream);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setConnectionFlowControlWindow(uint64_t windowSize) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
conn_->flowControlState.windowSize = windowSize;
maybeSendConnWindowUpdate(*conn_, Clock::now());
updateWriteLooper(true);
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamFlowControlWindow(
StreamId id,
uint64_t windowSize) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
stream->flowControlState.windowSize = windowSize;
maybeSendStreamWindowUpdate(*stream, Clock::now());
updateWriteLooper(true);
return folly::unit;
}
void QuicTransportBase::unsetAllReadCallbacks() {
for (const auto& [id, _] : readCallbacks_) {
setReadCallbackInternal(id, nullptr, APP_NO_ERROR);
}
}
void QuicTransportBase::unsetAllPeekCallbacks() {
for (const auto& [id, _] : peekCallbacks_) {
setPeekCallbackInternal(id, nullptr);
}
}
void QuicTransportBase::unsetAllDeliveryCallbacks() {
auto deliveryCallbacksCopy = deliveryCallbacks_;
for (const auto& [id, _] : deliveryCallbacksCopy) {
cancelDeliveryCallbacksForStream(id);
}
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::pauseRead(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumeRead(id, false);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::resumeRead(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumeRead(id, true);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::pauseOrResumeRead(StreamId id, bool resume) {
if (isSendingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto readCb = readCallbacks_.find(id);
if (readCb == readCallbacks_.end()) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (readCb->second.resumed != resume) {
readCb->second.resumed = resume;
updateReadLooper();
}
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::setPeekCallback(
StreamId id,
PeekCallback* cb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
setPeekCallbackInternal(id, cb);
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setPeekCallbackInternal(
StreamId id,
PeekCallback* cb) noexcept {
VLOG(4) << "Setting setPeekCallback for stream=" << id << " cb=" << cb << " "
<< *this;
auto peekCbIt = peekCallbacks_.find(id);
if (peekCbIt == peekCallbacks_.end()) {
// Don't allow initial setting of a nullptr callback.
if (!cb) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
peekCbIt = peekCallbacks_.emplace(id, PeekCallbackData(cb)).first;
}
if (!cb) {
VLOG(10) << "Resetting the peek callback to nullptr " << "stream=" << id
<< " peekCb=" << peekCbIt->second.peekCb;
}
peekCbIt->second.peekCb = cb;
updatePeekLooper();
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::pausePeek(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumePeek(id, false);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::resumePeek(
StreamId id) {
VLOG(4) << __func__ << " " << *this << " stream=" << id;
return pauseOrResumePeek(id, true);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::pauseOrResumePeek(StreamId id, bool resume) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto peekCb = peekCallbacks_.find(id);
if (peekCb == peekCallbacks_.end()) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (peekCb->second.resumed != resume) {
peekCb->second.resumed = resume;
updatePeekLooper();
}
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::peek(
StreamId id,
const folly::Function<void(StreamId id, const folly::Range<PeekIterator>&)
const>& peekCallback) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
updatePeekLooper();
updateWriteLooper(true);
};
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
if (stream->streamReadError) {
switch (stream->streamReadError->type()) {
case QuicErrorCode::Type::LocalErrorCode:
return folly::makeUnexpected(
*stream->streamReadError->asLocalErrorCode());
default:
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
peekDataFromQuicStream(*stream, std::move(peekCallback));
return folly::makeExpected<LocalErrorCode>(folly::Unit());
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::consume(
StreamId id,
size_t amount) {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
auto result = consume(id, stream->currentReadOffset, amount);
if (result.hasError()) {
return folly::makeUnexpected(result.error().first);
}
return folly::makeExpected<LocalErrorCode>(result.value());
}
folly::Expected<folly::Unit, std::pair<LocalErrorCode, Optional<uint64_t>>>
QuicTransportBase::consume(StreamId id, uint64_t offset, size_t amount) {
using ConsumeError = std::pair<LocalErrorCode, Optional<uint64_t>>;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::CONNECTION_CLOSED, none});
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
updatePeekLooper();
updateReadLooper(); // consume may affect "read" API
updateWriteLooper(true);
};
Optional<uint64_t> readOffset;
try {
// Need to check that the stream exists first so that we don't
// accidentally let the API create a peer stream that was not
// sent by the peer.
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::STREAM_NOT_EXISTS, readOffset});
}
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
readOffset = stream->currentReadOffset;
if (stream->currentReadOffset != offset) {
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::INTERNAL_ERROR, readOffset});
}
if (stream->streamReadError) {
switch (stream->streamReadError->type()) {
case QuicErrorCode::Type::LocalErrorCode:
return folly::makeUnexpected(
ConsumeError{*stream->streamReadError->asLocalErrorCode(), none});
default:
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::INTERNAL_ERROR, none});
}
}
consumeDataFromQuicStream(*stream, amount);
return folly::makeExpected<ConsumeError>(folly::Unit());
} catch (const QuicTransportException& ex) {
VLOG(4) << "consume() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("consume() error")));
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::TRANSPORT_ERROR, readOffset});
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("consume() error")));
return folly::makeUnexpected(ConsumeError{ex.errorCode(), readOffset});
} catch (const std::exception& ex) {
VLOG(4) << "consume() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("consume() error")));
return folly::makeUnexpected(
ConsumeError{LocalErrorCode::INTERNAL_ERROR, readOffset});
}
}
folly::Expected<StreamGroupId, LocalErrorCode>
QuicTransportBase::createBidirectionalStreamGroup() {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
return conn_->streamManager->createNextBidirectionalStreamGroup();
}
folly::Expected<StreamGroupId, LocalErrorCode>
QuicTransportBase::createUnidirectionalStreamGroup() {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
return conn_->streamManager->createNextUnidirectionalStreamGroup();
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createBidirectionalStreamInGroup(StreamGroupId groupId) {
return createStreamInternal(true, groupId);
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBase::createUnidirectionalStreamInGroup(StreamGroupId groupId) {
return createStreamInternal(false, groupId);
}
bool QuicTransportBase::isClientStream(StreamId stream) noexcept {
return quic::isClientStream(stream);
}
bool QuicTransportBase::isServerStream(StreamId stream) noexcept {
return quic::isServerStream(stream);
}
StreamDirectionality QuicTransportBase::getStreamDirectionality(
StreamId stream) noexcept {
return quic::getStreamDirectionality(stream);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::registerTxCallback(
StreamId id,
uint64_t offset,
ByteEventCallback* cb) {
return registerByteEventCallback(ByteEvent::Type::TX, id, offset, cb);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::setPingCallback(
PingCallback* cb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
VLOG(4) << "Setting ping callback " << " cb=" << cb << " " << *this;
pingCallback_ = cb;
return folly::unit;
}
void QuicTransportBase::sendPing(std::chrono::milliseconds pingTimeout) {
/* Step 0: Connection should not be closed */
if (closeState_ == CloseState::CLOSED) {
return;
}
// Step 1: Send a simple ping frame
conn_->pendingEvents.sendPing = true;
updateWriteLooper(true);
// Step 2: Schedule the timeout on event base
if (pingCallback_ && pingTimeout != 0ms) {
schedulePingTimeout(pingCallback_, pingTimeout);
}
}
void QuicTransportBase::schedulePingTimeout(
PingCallback* pingCb,
std::chrono::milliseconds timeout) {
// if a ping timeout is already scheduled, nothing to do, return
if (isTimeoutScheduled(&pingTimeout_)) {
return;
}
pingCallback_ = pingCb;
scheduleTimeout(&pingTimeout_, timeout);
}
void QuicTransportBase::setAckRxTimestampsEnabled(bool enableAckRxTimestamps) {
if (!enableAckRxTimestamps) {
conn_->transportSettings.maybeAckReceiveTimestampsConfigSentToPeer.clear();
}
}
void QuicTransportBase::setEarlyDataAppParamsFunctions(
folly::Function<bool(const Optional<std::string>&, const Buf&) const>
validator,
folly::Function<Buf()> getter) {
conn_->earlyDataAppParamsValidator = std::move(validator);
conn_->earlyDataAppParamsGetter = std::move(getter);
}
void QuicTransportBase::resetNonControlStreams(
ApplicationErrorCode error,
folly::StringPiece errorMsg) {
std::vector<StreamId> nonControlStreamIds;
nonControlStreamIds.reserve(conn_->streamManager->streamCount());
conn_->streamManager->streamStateForEach(
[&nonControlStreamIds](const auto& stream) {
if (!stream.isControl) {
nonControlStreamIds.push_back(stream.id);
}
});
for (auto id : nonControlStreamIds) {
if (isSendingStream(conn_->nodeType, id) || isBidirectionalStream(id)) {
auto writeCallbackIt = pendingWriteCallbacks_.find(id);
if (writeCallbackIt != pendingWriteCallbacks_.end()) {
writeCallbackIt->second->onStreamWriteError(
id, QuicError(error, errorMsg.str()));
}
resetStream(id, error);
}
if (isReceivingStream(conn_->nodeType, id) || isBidirectionalStream(id)) {
auto readCallbackIt = readCallbacks_.find(id);
if (readCallbackIt != readCallbacks_.end() &&
readCallbackIt->second.readCb) {
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(id).value_or(nullptr));
if (!stream->groupId) {
readCallbackIt->second.readCb->readError(
id, QuicError(error, errorMsg.str()));
} else {
readCallbackIt->second.readCb->readErrorWithGroup(
id, *stream->groupId, QuicError(error, errorMsg.str()));
}
}
peekCallbacks_.erase(id);
stopSending(id, error);
}
}
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setDatagramCallback(DatagramCallback* cb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
VLOG(4) << "Setting datagram callback " << " cb=" << cb << " " << *this;
datagramCallback_ = cb;
updateReadLooper();
return folly::unit;
}
uint16_t QuicTransportBase::getDatagramSizeLimit() const {
CHECK(conn_);
auto maxDatagramPacketSize = std::min<decltype(conn_->udpSendPacketLen)>(
conn_->datagramState.maxWriteFrameSize, conn_->udpSendPacketLen);
return std::max<decltype(maxDatagramPacketSize)>(
0, maxDatagramPacketSize - kMaxDatagramPacketOverhead);
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::writeDatagram(
Buf buf) {
// TODO(lniccolini) update max datagram frame size
// https://github.com/quicwg/datagram/issues/3
// For now, max_datagram_size > 0 means the peer supports datagram frames
if (conn_->datagramState.maxWriteFrameSize == 0) {
QUIC_STATS(conn_->statsCallback, onDatagramDroppedOnWrite);
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_DATA);
}
if (conn_->datagramState.writeBuffer.size() >=
conn_->datagramState.maxWriteBufferSize) {
QUIC_STATS(conn_->statsCallback, onDatagramDroppedOnWrite);
if (!conn_->transportSettings.datagramConfig.sendDropOldDataFirst) {
// TODO(lniccolini) use different return codes to signal the application
// exactly why the datagram got dropped
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_DATA);
} else {
conn_->datagramState.writeBuffer.pop_front();
}
}
conn_->datagramState.writeBuffer.emplace_back(std::move(buf));
updateWriteLooper(true);
return folly::unit;
}
folly::Expected<std::vector<ReadDatagram>, LocalErrorCode>
QuicTransportBase::readDatagrams(size_t atMost) {
CHECK(conn_);
auto datagrams = &conn_->datagramState.readBuffer;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (atMost == 0) {
atMost = datagrams->size();
} else {
atMost = std::min(atMost, datagrams->size());
}
std::vector<ReadDatagram> retDatagrams;
retDatagrams.reserve(atMost);
std::transform(
datagrams->begin(),
datagrams->begin() + atMost,
std::back_inserter(retDatagrams),
[](ReadDatagram& dg) { return std::move(dg); });
datagrams->erase(datagrams->begin(), datagrams->begin() + atMost);
return retDatagrams;
}
folly::Expected<std::vector<Buf>, LocalErrorCode>
QuicTransportBase::readDatagramBufs(size_t atMost) {
CHECK(conn_);
auto datagrams = &conn_->datagramState.readBuffer;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (atMost == 0) {
atMost = datagrams->size();
} else {
atMost = std::min(atMost, datagrams->size());
}
std::vector<Buf> retDatagrams;
retDatagrams.reserve(atMost);
std::transform(
datagrams->begin(),
datagrams->begin() + atMost,
std::back_inserter(retDatagrams),
[](ReadDatagram& dg) { return dg.bufQueue().move(); });
datagrams->erase(datagrams->begin(), datagrams->begin() + atMost);
return retDatagrams;
}
folly::Expected<PriorityQueue::Priority, LocalErrorCode>
QuicTransportBase::getStreamPriority(StreamId id) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (auto stream = conn_->streamManager->findStream(id)) {
return stream->priority;
}
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
bool QuicTransportBase::isDetachable() {
// only the client is detachable.
return conn_->nodeType == QuicNodeType::Client;
}
void QuicTransportBase::attachEventBase(std::shared_ptr<QuicEventBase> evbIn) {
VLOG(10) << __func__ << " " << *this;
DCHECK(!getEventBase());
DCHECK(evbIn && evbIn->isInEventBaseThread());
evb_ = std::move(evbIn);
if (socket_) {
socket_->attachEventBase(evb_);
}
scheduleAckTimeout();
schedulePathValidationTimeout();
setIdleTimer();
readLooper_->attachEventBase(evb_);
peekLooper_->attachEventBase(evb_);
writeLooper_->attachEventBase(evb_);
updateReadLooper();
updatePeekLooper();
updateWriteLooper(false);
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::evbEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<SocketObserverInterface::Events::evbEvents>(
[this](auto observer, auto observed) {
observer->evbAttach(observed, evb_.get());
});
}
}
void QuicTransportBase::detachEventBase() {
VLOG(10) << __func__ << " " << *this;
DCHECK(getEventBase() && getEventBase()->isInEventBaseThread());
if (socket_) {
socket_->detachEventBase();
}
connWriteCallback_ = nullptr;
pendingWriteCallbacks_.clear();
cancelTimeout(&lossTimeout_);
cancelTimeout(&ackTimeout_);
cancelTimeout(&pathValidationTimeout_);
cancelTimeout(&idleTimeout_);
cancelTimeout(&keepaliveTimeout_);
cancelTimeout(&drainTimeout_);
readLooper_->detachEventBase();
peekLooper_->detachEventBase();
writeLooper_->detachEventBase();
#ifndef MVFST_USE_LIBEV
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::evbEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<SocketObserverInterface::Events::evbEvents>(
[this](auto observer, auto observed) {
observer->evbDetach(observed, evb_.get());
});
}
#endif
evb_ = nullptr;
}
inline std::ostream& operator<<(
std::ostream& os,
const CloseState& closeState) {
switch (closeState) {
case CloseState::OPEN:
os << "OPEN";
break;
case CloseState::GRACEFUL_CLOSING:
os << "GRACEFUL_CLOSING";
break;
case CloseState::CLOSED:
os << "CLOSED";
break;
}
return os;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::maybeResetStreamFromReadError(
StreamId id,
QuicErrorCode error) {
if (quic::ApplicationErrorCode* code = error.asApplicationErrorCode()) {
return resetStream(id, *code);
}
return folly::Expected<folly::Unit, LocalErrorCode>(folly::unit);
}
void QuicTransportBase::setCmsgs(const folly::SocketCmsgMap& options) {
socket_->setCmsgs(options);
}
void QuicTransportBase::appendCmsgs(const folly::SocketCmsgMap& options) {
socket_->appendCmsgs(options);
}
bool QuicTransportBase::checkCustomRetransmissionProfilesEnabled() const {
return quic::checkCustomRetransmissionProfilesEnabled(*conn_);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamGroupRetransmissionPolicy(
StreamGroupId groupId,
std::optional<QuicStreamGroupRetransmissionPolicy> policy) noexcept {
// Reset the policy to default one.
if (policy == std::nullopt) {
conn_->retransmissionPolicies.erase(groupId);
return folly::unit;
}
if (!checkCustomRetransmissionProfilesEnabled()) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (conn_->retransmissionPolicies.size() >=
conn_->transportSettings.advertisedMaxStreamGroups) {
return folly::makeUnexpected(LocalErrorCode::RTX_POLICIES_LIMIT_EXCEEDED);
}
conn_->retransmissionPolicies.emplace(groupId, *policy);
return folly::unit;
}
} // namespace quic