1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-04-18 17:24:03 +03:00
mvfst/quic/api/QuicTransportBaseLite.cpp
Alan Frindell 1e1c7defef Use new priority queue implementation
Summary:
This adds the new priority queue implementation and a TransportSetting that controls whether it should be used or not.  The default is still the old priority queue, so this diff should not introduce any functional changes in production code.

One key difference is that with the new queue, streams with new data that become connection flow control blocked are *removed* from the queue, and added back once more flow control comes.  I think this will make the scheduler slightly more efficient at writing low-priority loss streams when there's high-pri data and no connection flow control, since it doesn't need to skip over those streams when building the packet.

If this diff regresses build size, D72476484 should get it back.

Reviewed By: mjoras

Differential Revision: D72476486

fbshipit-source-id: 9665cf3f66dcdbfd57d2199d5c832529a68cfac0
2025-04-17 08:43:19 -07:00

3462 lines
126 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/api/LoopDetectorCallback.h>
#include <quic/api/QuicTransportBaseLite.h>
#include <quic/api/QuicTransportFunctions.h>
#include <quic/congestion_control/CongestionControllerFactory.h>
#include <quic/congestion_control/EcnL4sTracker.h>
#include <quic/congestion_control/TokenlessPacer.h>
#include <quic/flowcontrol/QuicFlowController.h>
#include <quic/loss/QuicLossFunctions.h>
#include <quic/state/QuicPacingFunctions.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/stream/StreamSendHandlers.h>
#include <sstream>
namespace {
constexpr auto APP_NO_ERROR = quic::GenericApplicationErrorCode::NO_ERROR;
quic::QuicError maybeSetGenericAppError(
quic::Optional<quic::QuicError>&& error) {
return std::move(error).value_or(
quic::QuicError{APP_NO_ERROR, quic::toString(APP_NO_ERROR)});
}
} // namespace
namespace quic {
inline std::ostream& operator<<(
std::ostream& os,
const CloseState& closeState) {
switch (closeState) {
case CloseState::OPEN:
os << "OPEN";
break;
case CloseState::GRACEFUL_CLOSING:
os << "GRACEFUL_CLOSING";
break;
case CloseState::CLOSED:
os << "CLOSED";
break;
}
return os;
}
QuicTransportBaseLite::QuicTransportBaseLite(
std::shared_ptr<QuicEventBase> evb,
std::unique_ptr<QuicAsyncUDPSocket> socket,
bool useConnectionEndWithErrorCallback)
: evb_(std::move(evb)),
socket_(std::move(socket)),
useConnectionEndWithErrorCallback_(useConnectionEndWithErrorCallback),
lossTimeout_(this),
excessWriteTimeout_(this),
idleTimeout_(this),
keepaliveTimeout_(this),
ackTimeout_(this),
pathValidationTimeout_(this),
drainTimeout_(this),
pingTimeout_(this),
writeLooper_(new FunctionLooper(
evb_,
[this]() { pacedWriteDataToSocket(); },
LooperType::WriteLooper)),
readLooper_(new FunctionLooper(
evb_,
[this]() { invokeReadDataAndCallbacks(true); },
LooperType::ReadLooper)),
peekLooper_(new FunctionLooper(
evb_,
[this]() { invokePeekDataAndCallbacks(); },
LooperType::PeekLooper)) {}
void QuicTransportBaseLite::onNetworkData(
const folly::SocketAddress& peer,
NetworkData&& networkData) noexcept {
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
if (!conn_->transportSettings.networkDataPerSocketRead) {
checkForClosedStream();
updateReadLooper();
updatePeekLooper();
updateWriteLooper(true);
}
};
try {
conn_->lossState.totalBytesRecvd += networkData.getTotalData();
auto originalAckVersion = currentAckStateVersion(*conn_);
// handle PacketsReceivedEvent if requested by observers
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::packetsReceivedEvents>()) {
auto builder = SocketObserverInterface::PacketsReceivedEvent::Builder()
.setReceiveLoopTime(TimePoint::clock::now())
.setNumPacketsReceived(networkData.getPackets().size())
.setNumBytesReceived(networkData.getTotalData());
for (auto& packet : networkData.getPackets()) {
auto receivedUdpPacketBuilder =
SocketObserverInterface::PacketsReceivedEvent::ReceivedUdpPacket::
Builder()
.setPacketReceiveTime(packet.timings.receiveTimePoint)
.setPacketNumBytes(packet.buf.chainLength())
.setPacketTos(packet.tosValue);
if (packet.timings.maybeSoftwareTs) {
receivedUdpPacketBuilder.setPacketSoftwareRxTimestamp(
packet.timings.maybeSoftwareTs->systemClock.raw);
}
builder.addReceivedUdpPacket(
std::move(receivedUdpPacketBuilder).build());
}
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::packetsReceivedEvents>(
[event = std::move(builder).build()](
auto observer, auto observed) {
observer->packetsReceived(observed, event);
});
}
auto packets = std::move(networkData).movePackets();
for (auto& packet : packets) {
auto res = onReadData(peer, std::move(packet));
if (res.hasError()) {
VLOG(4) << __func__ << " " << res.error().message << " " << *this;
exceptionCloseWhat_ = res.error().message;
return closeImpl(res.error());
}
if (conn_->peerConnectionError) {
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed"));
return;
} else if (conn_->transportSettings.processCallbacksPerPacket) {
invokeReadDataAndCallbacks(false);
}
}
if (!conn_->transportSettings.networkDataPerSocketRead) {
processCallbacksAfterNetworkData();
}
if (closeState_ != CloseState::CLOSED) {
if (currentAckStateVersion(*conn_) != originalAckVersion) {
setIdleTimer();
conn_->receivedNewPacketBeforeWrite = true;
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::READ_OK;
conn_->readDebugState.loopCount = 0;
}
} else if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::STALE_DATA;
conn_->loopDetectorCallback->onSuspiciousReadLoops(
++conn_->readDebugState.loopCount,
conn_->readDebugState.noReadReason);
}
// Reading data could process an ack and change the loss timer.
setLossDetectionAlarm(*conn_, *self);
// Reading data could change the state of the acks which could change
// the ack timer. But we need to call scheduleAckTimeout() for it to
// take effect.
scheduleAckTimeout();
// Received data could contain valid path response, in which case
// path validation timeout should be canceled
schedulePathValidationTimeout();
// If ECN is enabled, make sure that the packet marking is happening as
// expected
validateECNState();
} else {
// In the closed state, we would want to write a close if possible
// however the write looper will not be set.
auto result = writeSocketData();
if (result.hasError()) {
VLOG(4) << __func__ << " " << result.error().message << " " << *this;
exceptionCloseWhat_ = result.error().message;
closeImpl(result.error());
}
}
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
return closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string(ex.what())));
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
return closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string(ex.what())));
} catch (const QuicApplicationException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
return closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string(ex.what())));
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
return closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("error onNetworkData()")));
}
}
void QuicTransportBaseLite::close(Optional<QuicError> errorCode) {
[[maybe_unused]] auto self = sharedGuard();
// The caller probably doesn't need a conn callback any more because they
// explicitly called close.
resetConnectionCallbacks();
// If we were called with no error code, ensure that we are going to write
// an application close, so the peer knows it didn't come from the transport.
errorCode = maybeSetGenericAppError(std::move(errorCode));
closeImpl(std::move(errorCode), true);
}
void QuicTransportBaseLite::closeNow(Optional<QuicError> errorCode) {
DCHECK(getEventBase() && getEventBase()->isInEventBaseThread());
[[maybe_unused]] auto self = sharedGuard();
VLOG(4) << __func__ << " " << *this;
errorCode = maybeSetGenericAppError(std::move(errorCode));
closeImpl(std::move(errorCode), false);
// the drain timeout may have been scheduled by a previous close, in which
// case, our close would not take effect. This cancels the drain timeout in
// this case and expires the timeout.
if (isTimeoutScheduled(&drainTimeout_)) {
cancelTimeout(&drainTimeout_);
drainTimeoutExpired();
}
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBaseLite::stopSending(
StreamId id,
ApplicationErrorCode error) {
if (isSendingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto* stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
if (stream->recvState == StreamRecvState::Closed) {
// skip STOP_SENDING if ingress is already closed
return folly::unit;
}
if (conn_->transportSettings.dropIngressOnStopSending) {
processTxStopSending(*stream);
}
// send STOP_SENDING frame to peer
sendSimpleFrame(*conn_, StopSendingFrame(id, error));
updateWriteLooper(true);
return folly::unit;
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBaseLite::createBidirectionalStream(bool /*replaySafe*/) {
return createStreamInternal(true);
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBaseLite::createUnidirectionalStream(bool /*replaySafe*/) {
return createStreamInternal(false);
}
uint64_t QuicTransportBaseLite::getNumOpenableBidirectionalStreams() const {
return conn_->streamManager->openableLocalBidirectionalStreams();
}
uint64_t QuicTransportBaseLite::getNumOpenableUnidirectionalStreams() const {
return conn_->streamManager->openableLocalUnidirectionalStreams();
}
bool QuicTransportBaseLite::isUnidirectionalStream(StreamId stream) noexcept {
return quic::isUnidirectionalStream(stream);
}
bool QuicTransportBaseLite::isBidirectionalStream(StreamId stream) noexcept {
return quic::isBidirectionalStream(stream);
}
QuicSocketLite::WriteResult QuicTransportBaseLite::writeChain(
StreamId id,
Buf data,
bool eof,
ByteEventCallback* cb) {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
try {
// Check whether stream exists before calling getStream to avoid
// creating a peer stream if it does not exist yet.
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
if (!stream->writable()) {
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
}
// Register DeliveryCallback for the data + eof offset.
if (cb) {
auto dataLength =
(data ? data->computeChainDataLength() : 0) + (eof ? 1 : 0);
if (dataLength) {
auto currentLargestWriteOffset = getLargestWriteOffsetSeen(*stream);
registerDeliveryCallback(
id, currentLargestWriteOffset + dataLength - 1, cb);
}
}
bool wasAppLimitedOrIdle = false;
if (conn_->congestionController) {
wasAppLimitedOrIdle = conn_->congestionController->isAppLimited();
wasAppLimitedOrIdle |= conn_->streamManager->isAppIdle();
}
auto result = writeDataToQuicStream(*stream, std::move(data), eof);
if (result.hasError()) {
VLOG(4) << __func__ << " streamId=" << id << " " << result.error().message
<< " " << *this;
exceptionCloseWhat_ = result.error().message;
closeImpl(
QuicError(result.error().code, std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
}
// If we were previously app limited restart pacing with the current rate.
if (wasAppLimitedOrIdle && conn_->pacer) {
conn_->pacer->reset();
}
updateWriteLooper(true);
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("writeChain() error")));
return folly::makeUnexpected(ex.errorCode());
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
return folly::unit;
}
Optional<LocalErrorCode> QuicTransportBaseLite::shutdownWrite(StreamId id) {
if (isReceivingStream(conn_->nodeType, id)) {
return LocalErrorCode::INVALID_OPERATION;
}
return none;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::registerDeliveryCallback(
StreamId id,
uint64_t offset,
ByteEventCallback* cb) {
return registerByteEventCallback(ByteEvent::Type::ACK, id, offset, cb);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::notifyPendingWriteOnConnection(
ConnectionWriteCallback* wcb) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (connWriteCallback_ != nullptr) {
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
}
// Assign the write callback before going into the loop so that if we close
// the connection while we are still scheduled, the write callback will get
// an error synchronously.
connWriteCallback_ = wcb;
runOnEvbAsync([](auto self) {
if (!self->connWriteCallback_) {
// The connection was probably closed.
return;
}
auto connWritableBytes = self->maxWritableOnConn();
if (connWritableBytes != 0) {
auto connWriteCallback = self->connWriteCallback_;
self->connWriteCallback_ = nullptr;
connWriteCallback->onConnectionWriteReady(connWritableBytes);
}
});
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::unregisterStreamWriteCallback(StreamId id) {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
if (pendingWriteCallbacks_.find(id) == pendingWriteCallbacks_.end()) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
pendingWriteCallbacks_.erase(id);
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBaseLite::resetStream(
StreamId id,
ApplicationErrorCode errorCode) {
return resetStreamInternal(id, errorCode, false /* reliable */);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::updateReliableDeliveryCheckpoint(StreamId id) {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream =
CHECK_NOTNULL(conn_->streamManager->getStream(id).value_or(nullptr));
if (stream->sendState == StreamSendState::ResetSent) {
// We already sent a reset, so there's really no reason why we should be
// doing any more checkpointing, especially since we cannot
// increase the reliable size in subsequent resets.
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
stream->reliableResetCheckpoint =
stream->currentWriteOffset + stream->pendingWrites.chainLength();
return folly::Unit();
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::resetStreamReliably(
StreamId id,
ApplicationErrorCode errorCode) {
if (!conn_->transportSettings.advertisedReliableResetStreamSupport ||
!conn_->peerAdvertisedReliableStreamResetSupport) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
return resetStreamInternal(id, errorCode, true /* reliable */);
}
void QuicTransportBaseLite::cancelDeliveryCallbacksForStream(StreamId id) {
cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id);
}
void QuicTransportBaseLite::cancelDeliveryCallbacksForStream(
StreamId id,
uint64_t offset) {
cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id, offset);
}
void QuicTransportBaseLite::cancelByteEventCallbacksForStream(
const StreamId id,
const Optional<uint64_t>& offsetUpperBound) {
invokeForEachByteEventType(
([this, id, &offsetUpperBound](const ByteEvent::Type type) {
cancelByteEventCallbacksForStream(type, id, offsetUpperBound);
}));
}
void QuicTransportBaseLite::cancelByteEventCallbacksForStream(
const ByteEvent::Type type,
const StreamId id,
const Optional<uint64_t>& offsetUpperBound) {
cancelByteEventCallbacksForStreamInternal(
type, id, [&offsetUpperBound](uint64_t cbOffset) {
return !offsetUpperBound || cbOffset < *offsetUpperBound;
});
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::notifyPendingWriteOnStream(
StreamId id,
StreamWriteCallback* wcb) {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
if (!stream->writable()) {
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
}
if (wcb == nullptr) {
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
}
// Add the callback to the pending write callbacks so that if we are closed
// while we are scheduled in the loop, the close will error out the
// callbacks.
auto wcbEmplaceResult = pendingWriteCallbacks_.emplace(id, wcb);
if (!wcbEmplaceResult.second) {
if ((wcbEmplaceResult.first)->second != wcb) {
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
} else {
return folly::makeUnexpected(LocalErrorCode::CALLBACK_ALREADY_INSTALLED);
}
}
runOnEvbAsync([id](auto self) {
auto wcbIt = self->pendingWriteCallbacks_.find(id);
if (wcbIt == self->pendingWriteCallbacks_.end()) {
// the connection was probably closed.
return;
}
auto writeCallback = wcbIt->second;
if (!self->conn_->streamManager->streamExists(id)) {
self->pendingWriteCallbacks_.erase(wcbIt);
writeCallback->onStreamWriteError(
id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS));
return;
}
auto stream = CHECK_NOTNULL(
self->conn_->streamManager->getStream(id).value_or(nullptr));
if (!stream->writable()) {
self->pendingWriteCallbacks_.erase(wcbIt);
writeCallback->onStreamWriteError(
id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS));
return;
}
auto maxCanWrite = self->maxWritableOnStream(*stream);
if (maxCanWrite != 0) {
self->pendingWriteCallbacks_.erase(wcbIt);
writeCallback->onStreamWriteReady(id, maxCanWrite);
}
});
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::registerByteEventCallback(
const ByteEvent::Type type,
const StreamId id,
const uint64_t offset,
ByteEventCallback* cb) {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
if (!cb) {
return folly::unit;
}
ByteEventMap& byteEventMap = getByteEventMap(type);
auto byteEventMapIt = byteEventMap.find(id);
if (byteEventMapIt == byteEventMap.end()) {
byteEventMap.emplace(
id,
std::initializer_list<std::remove_reference<
decltype(byteEventMap)>::type::mapped_type::value_type>(
{{offset, cb}}));
} else {
// Keep ByteEvents for the same stream sorted by offsets:
auto pos = std::upper_bound(
byteEventMapIt->second.begin(),
byteEventMapIt->second.end(),
offset,
[&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; });
if (pos != byteEventMapIt->second.begin()) {
auto matchingEvent = std::find_if(
byteEventMapIt->second.begin(),
pos,
[offset, cb](const ByteEventDetail& p) {
return ((p.offset == offset) && (p.callback == cb));
});
if (matchingEvent != pos) {
// ByteEvent has been already registered for the same type, id,
// offset and for the same recipient, return an INVALID_OPERATION
// error to prevent duplicate registrations.
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
}
byteEventMapIt->second.emplace(pos, offset, cb);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
// Notify recipients that the registration was successful.
cb->onByteEventRegistered(ByteEvent{id, offset, type});
// if the callback is already ready, we still insert, but schedule to
// process
Optional<uint64_t> maxOffsetReady;
switch (type) {
case ByteEvent::Type::ACK:
maxOffsetReady = getLargestDeliverableOffset(*stream);
break;
case ByteEvent::Type::TX:
maxOffsetReady = getLargestWriteOffsetTxed(*stream);
break;
}
if (maxOffsetReady.has_value() && (offset <= *maxOffsetReady)) {
runOnEvbAsync([id, cb, offset, type](auto selfObj) {
if (selfObj->closeState_ != CloseState::OPEN) {
// Close will error out all byte event callbacks.
return;
}
auto& byteEventMapL = selfObj->getByteEventMap(type);
auto streamByteEventCbIt = byteEventMapL.find(id);
if (streamByteEventCbIt == byteEventMapL.end()) {
return;
}
// This is scheduled to run in the future (during the next iteration of
// the event loop). It is possible that the ByteEventDetail list gets
// mutated between the time it was scheduled to now when we are ready to
// run it. Look at the current outstanding ByteEvents for this stream ID
// and confirm that our ByteEvent's offset and recipient callback are
// still present.
auto pos = std::find_if(
streamByteEventCbIt->second.begin(),
streamByteEventCbIt->second.end(),
[offset, cb](const ByteEventDetail& p) {
return ((p.offset == offset) && (p.callback == cb));
});
// if our byteEvent is not present, it must have been delivered already.
if (pos == streamByteEventCbIt->second.end()) {
return;
}
streamByteEventCbIt->second.erase(pos);
cb->onByteEvent(ByteEvent{id, offset, type});
});
}
return folly::unit;
}
bool QuicTransportBaseLite::good() const {
return closeState_ == CloseState::OPEN && hasWriteCipher() && !error();
}
bool QuicTransportBaseLite::error() const {
return conn_->localConnectionError.has_value();
}
uint64_t QuicTransportBaseLite::bufferSpaceAvailable() const {
auto bytesBuffered = conn_->flowControlState.sumCurStreamBufferLen;
auto totalBufferSpaceAvailable =
conn_->transportSettings.totalBufferSpaceAvailable;
return bytesBuffered > totalBufferSpaceAvailable
? 0
: totalBufferSpaceAvailable - bytesBuffered;
}
void QuicTransportBaseLite::setConnectionSetupCallback(
folly::MaybeManagedPtr<ConnectionSetupCallback> callback) {
connSetupCallback_ = callback;
}
void QuicTransportBaseLite::setConnectionCallback(
folly::MaybeManagedPtr<ConnectionCallback> callback) {
connCallback_ = callback;
if (connCallback_) {
runOnEvbAsync([](auto self) { self->processCallbacksAfterNetworkData(); });
}
}
void QuicTransportBaseLite::setConnectionCallbackFromCtor(
folly::MaybeManagedPtr<ConnectionCallback> callback) {
connCallback_ = callback;
}
Optional<LocalErrorCode> QuicTransportBaseLite::setControlStream(StreamId id) {
if (!conn_->streamManager->streamExists(id)) {
return LocalErrorCode::STREAM_NOT_EXISTS;
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
conn_->streamManager->setStreamAsControl(*stream);
return none;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::setReadCallback(
StreamId id,
ReadCallback* cb,
Optional<ApplicationErrorCode> err) {
if (isSendingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (cb != nullptr && closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
return setReadCallbackInternal(id, cb, err);
}
folly::Expected<std::pair<Buf, bool>, LocalErrorCode>
QuicTransportBaseLite::read(StreamId id, size_t maxLen) {
if (isSendingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
updateReadLooper();
updatePeekLooper(); // read can affect "peek" API
updateWriteLooper(true);
};
try {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
auto readResult = readDataFromQuicStream(*stream, maxLen);
if (readResult.hasError()) {
VLOG(4) << "read() error " << readResult.error().message << " " << *this;
exceptionCloseWhat_ = readResult.error().message;
closeImpl(QuicError(
QuicErrorCode(readResult.error().code), std::string("read() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
}
auto result = std::move(readResult.value());
if (result.second) {
VLOG(10) << "Delivered eof to app for stream=" << stream->id << " "
<< *this;
auto it = readCallbacks_.find(id);
if (it != readCallbacks_.end()) {
// it's highly unlikely that someone called read() without having a read
// callback so we don't deal with the case of someone installing a read
// callback after reading the EOM.
it->second.deliveredEOM = true;
}
}
return folly::makeExpected<LocalErrorCode>(std::move(result));
} catch (const QuicTransportException& ex) {
VLOG(4) << "read() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(
QuicError(QuicErrorCode(ex.errorCode()), std::string("read() error")));
return folly::makeUnexpected(ex.errorCode());
} catch (const std::exception& ex) {
VLOG(4) << "read() error " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("read() error")));
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
}
void QuicTransportBaseLite::setQLogger(std::shared_ptr<QLogger> qLogger) {
// setQLogger can be called multiple times for the same connection and with
// the same qLogger we track the number of times it gets set and the number
// of times it gets reset, and only stop qlog collection when the number of
// resets equals the number of times the logger was set
if (!conn_->qLogger) {
CHECK_EQ(qlogRefcnt_, 0);
} else {
CHECK_GT(qlogRefcnt_, 0);
}
if (qLogger) {
conn_->qLogger = std::move(qLogger);
conn_->qLogger->setDcid(conn_->clientChosenDestConnectionId);
if (conn_->nodeType == QuicNodeType::Server) {
conn_->qLogger->setScid(conn_->serverConnectionId);
} else {
conn_->qLogger->setScid(conn_->clientConnectionId);
}
qlogRefcnt_++;
} else {
if (conn_->qLogger) {
qlogRefcnt_--;
if (qlogRefcnt_ == 0) {
conn_->qLogger = nullptr;
}
}
}
}
const std::shared_ptr<QLogger> QuicTransportBaseLite::getQLogger() const {
return conn_->qLogger;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::setPriorityQueue(std::unique_ptr<PriorityQueue> queue) {
if (conn_) {
return conn_->streamManager->setPriorityQueue(std::move(queue));
}
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::setStreamPriority(
StreamId id,
PriorityQueue::Priority priority) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
// It's not an error to try to prioritize a non-existent stream.
return folly::unit;
}
// It's not an error to prioritize a stream after it's sent its FIN - this
// can reprioritize retransmissions.
conn_->streamManager->setStreamPriority(
id,
priority,
getSendConnFlowControlBytesWire(*conn_) > 0,
conn_->qLogger);
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::setMaxPacingRate(uint64_t maxRateBytesPerSec) {
if (conn_->pacer) {
conn_->pacer->setMaxPacingRate(maxRateBytesPerSec);
return folly::unit;
} else {
LOG(WARNING)
<< "Cannot set max pacing rate without a pacer. Pacing Enabled = "
<< conn_->transportSettings.pacingEnabled;
return folly::makeUnexpected(LocalErrorCode::PACER_NOT_AVAILABLE);
}
}
void QuicTransportBaseLite::setThrottlingSignalProvider(
std::shared_ptr<ThrottlingSignalProvider> throttlingSignalProvider) {
DCHECK(conn_);
conn_->throttlingSignalProvider = throttlingSignalProvider;
}
uint64_t QuicTransportBaseLite::maxWritableOnStream(
const QuicStreamState& stream) const {
auto connWritableBytes = maxWritableOnConn();
auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream);
return std::min(streamFlowControlBytes, connWritableBytes);
}
void QuicTransportBaseLite::processConnectionSetupCallbacks(
QuicError&& cancelCode) {
// connSetupCallback_ could be null if start() was never
// invoked and the transport was destroyed or if the app initiated close.
if (connSetupCallback_) {
connSetupCallback_->onConnectionSetupError(std::move(cancelCode));
}
}
void QuicTransportBaseLite::processConnectionCallbacks(QuicError&& cancelCode) {
// connCallback_ could be null if start() was never
// invoked and the transport was destroyed or if the app initiated close.
if (!connCallback_) {
return;
}
if (useConnectionEndWithErrorCallback_) {
connCallback_->onConnectionEnd(cancelCode);
return;
}
if (processCancelCode(cancelCode)) {
connCallback_->onConnectionEnd();
} else {
connCallback_->onConnectionError(std::move(cancelCode));
}
}
QuicTransportBaseLite::ByteEventMap& QuicTransportBaseLite::getByteEventMap(
const ByteEvent::Type type) {
switch (type) {
case ByteEvent::Type::ACK:
return deliveryCallbacks_;
case ByteEvent::Type::TX:
return txCallbacks_;
}
LOG(FATAL) << "Unhandled case in getByteEventMap";
folly::assume_unreachable();
}
const QuicTransportBaseLite::ByteEventMap&
QuicTransportBaseLite::getByteEventMapConst(const ByteEvent::Type type) const {
switch (type) {
case ByteEvent::Type::ACK:
return deliveryCallbacks_;
case ByteEvent::Type::TX:
return txCallbacks_;
}
LOG(FATAL) << "Unhandled case in getByteEventMapConst";
folly::assume_unreachable();
}
folly::Expected<QuicSocketLite::StreamTransportInfo, LocalErrorCode>
QuicTransportBaseLite::getStreamTransportInfo(StreamId id) const {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
auto packets = getNumPacketsTxWithNewData(*stream);
return StreamTransportInfo{
stream->totalHolbTime,
stream->holbCount,
bool(stream->lastHolbTime),
packets,
stream->streamLossCount,
stream->finalWriteOffset,
stream->finalReadOffset,
stream->streamReadError,
stream->streamWriteError};
}
const folly::SocketAddress& QuicTransportBaseLite::getPeerAddress() const {
return conn_->peerAddress;
}
const folly::SocketAddress& QuicTransportBaseLite::getOriginalPeerAddress()
const {
return conn_->originalPeerAddress;
}
std::shared_ptr<QuicEventBase> QuicTransportBaseLite::getEventBase() const {
return evb_;
}
Optional<std::string> QuicTransportBaseLite::getAppProtocol() const {
return conn_->handshakeLayer->getApplicationProtocol();
}
uint64_t QuicTransportBaseLite::getConnectionBufferAvailable() const {
return bufferSpaceAvailable();
}
folly::Expected<QuicSocketLite::FlowControlState, LocalErrorCode>
QuicTransportBaseLite::getStreamFlowControl(StreamId id) const {
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
return QuicSocketLite::FlowControlState(
getSendStreamFlowControlBytesAPI(*stream),
stream->flowControlState.peerAdvertisedMaxOffset,
getRecvStreamFlowControlBytes(*stream),
stream->flowControlState.advertisedMaxOffset);
}
void QuicTransportBaseLite::runOnEvbAsync(
folly::Function<void(std::shared_ptr<QuicTransportBaseLite>)> func) {
auto evb = getEventBase();
evb->runInLoop(
[self = sharedGuard(), func = std::move(func), evb]() mutable {
if (self->getEventBase() != evb) {
// The eventbase changed between scheduling the loop and invoking
// the callback, ignore this
return;
}
func(std::move(self));
},
true);
}
void QuicTransportBaseLite::updateWriteLooper(bool thisIteration) {
if (closeState_ == CloseState::CLOSED) {
VLOG(10) << nodeToString(conn_->nodeType)
<< " stopping write looper because conn closed " << *this;
writeLooper_->stop();
return;
}
// If socket writable events are in use, do nothing if we are already waiting
// for the write event.
if (conn_->transportSettings.useSockWritableEvents &&
socket_->isWritableCallbackSet()) {
return;
}
auto writeDataReason = shouldWriteData(*conn_);
if (writeDataReason != WriteDataReason::NO_WRITE) {
VLOG(10) << nodeToString(conn_->nodeType)
<< " running write looper thisIteration=" << thisIteration << " "
<< *this;
writeLooper_->run(thisIteration);
if (conn_->loopDetectorCallback) {
conn_->writeDebugState.needsWriteLoopDetect =
(conn_->loopDetectorCallback != nullptr);
}
} else {
VLOG(10) << nodeToString(conn_->nodeType) << " stopping write looper "
<< *this;
writeLooper_->stop();
if (conn_->loopDetectorCallback) {
conn_->writeDebugState.needsWriteLoopDetect = false;
conn_->writeDebugState.currentEmptyLoopCount = 0;
}
}
if (conn_->loopDetectorCallback) {
conn_->writeDebugState.writeDataReason = writeDataReason;
}
}
void QuicTransportBaseLite::updateReadLooper() {
if (closeState_ != CloseState::OPEN) {
VLOG(10) << "Stopping read looper " << *this;
readLooper_->stop();
return;
}
auto matcherFn = [&readCallbacks = readCallbacks_](StreamId s) {
auto readCb = readCallbacks.find(s);
if (readCb == readCallbacks.end()) {
return false;
}
// TODO: if the stream has an error and it is also paused we should
// still return an error
return readCb->second.readCb && readCb->second.resumed;
};
auto iter = std::find_if(
conn_->streamManager->readableStreams().begin(),
conn_->streamManager->readableStreams().end(),
matcherFn);
auto unidirIter = std::find_if(
conn_->streamManager->readableUnidirectionalStreams().begin(),
conn_->streamManager->readableUnidirectionalStreams().end(),
matcherFn);
if (iter != conn_->streamManager->readableStreams().end() ||
unidirIter !=
conn_->streamManager->readableUnidirectionalStreams().end() ||
!conn_->datagramState.readBuffer.empty()) {
VLOG(10) << "Scheduling read looper " << *this;
readLooper_->run();
} else {
VLOG(10) << "Stopping read looper " << *this;
readLooper_->stop();
}
}
void QuicTransportBaseLite::updatePeekLooper() {
if (peekCallbacks_.empty() || closeState_ != CloseState::OPEN) {
VLOG(10) << "Stopping peek looper " << *this;
peekLooper_->stop();
return;
}
VLOG(10) << "Updating peek looper, has "
<< conn_->streamManager->peekableStreams().size()
<< " peekable streams";
auto iter = std::find_if(
conn_->streamManager->peekableStreams().begin(),
conn_->streamManager->peekableStreams().end(),
[&peekCallbacks = peekCallbacks_](StreamId s) {
VLOG(10) << "Checking stream=" << s;
auto peekCb = peekCallbacks.find(s);
if (peekCb == peekCallbacks.end()) {
VLOG(10) << "No peek callbacks for stream=" << s;
return false;
}
if (!peekCb->second.resumed) {
VLOG(10) << "peek callback for stream=" << s << " not resumed";
}
if (!peekCb->second.peekCb) {
VLOG(10) << "no peekCb in peekCb stream=" << s;
}
return peekCb->second.peekCb && peekCb->second.resumed;
});
if (iter != conn_->streamManager->peekableStreams().end()) {
VLOG(10) << "Scheduling peek looper " << *this;
peekLooper_->run();
} else {
VLOG(10) << "Stopping peek looper " << *this;
peekLooper_->stop();
}
}
void QuicTransportBaseLite::maybeStopWriteLooperAndArmSocketWritableEvent() {
if (!socket_ || (closeState_ == CloseState::CLOSED)) {
return;
}
if (conn_->transportSettings.useSockWritableEvents &&
!socket_->isWritableCallbackSet()) {
// Check if all data has been written and we're not limited by flow
// control/congestion control.
auto writeReason = shouldWriteData(*conn_);
bool haveBufferToRetry = writeReason == WriteDataReason::BUFFERED_WRITE;
bool haveNewDataToWrite =
(writeReason != WriteDataReason::NO_WRITE) && !haveBufferToRetry;
bool haveCongestionControlWindow = true;
if (conn_->congestionController) {
haveCongestionControlWindow =
conn_->congestionController->getWritableBytes() > 0;
}
bool haveFlowControlWindow = getSendConnFlowControlBytesAPI(*conn_) > 0;
bool connHasWriteWindow =
haveCongestionControlWindow && haveFlowControlWindow;
if (haveBufferToRetry || (haveNewDataToWrite && connHasWriteWindow)) {
// Re-arm the write event and stop the write
// looper.
socket_->resumeWrite(this);
writeLooper_->stop();
}
}
}
void QuicTransportBaseLite::checkForClosedStream() {
if (closeState_ == CloseState::CLOSED) {
return;
}
auto itr = conn_->streamManager->closedStreams().begin();
while (itr != conn_->streamManager->closedStreams().end()) {
const auto& streamId = *itr;
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::streamEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::streamEvents>(
[event = SocketObserverInterface::StreamCloseEvent(
streamId,
getStreamInitiator(streamId),
getStreamDirectionality(streamId))](
auto observer, auto observed) {
observer->streamClosed(observed, event);
});
}
// We may be in an active read cb when we close the stream
auto readCbIt = readCallbacks_.find(*itr);
// We use the read callback as a way to defer destruction of the stream.
if (readCbIt != readCallbacks_.end() &&
readCbIt->second.readCb != nullptr) {
if (conn_->transportSettings.removeStreamAfterEomCallbackUnset ||
!readCbIt->second.deliveredEOM) {
VLOG(10) << "Not closing stream=" << *itr
<< " because it has active read callback";
++itr;
continue;
}
}
// We may be in the active peek cb when we close the stream
auto peekCbIt = peekCallbacks_.find(*itr);
if (peekCbIt != peekCallbacks_.end() &&
peekCbIt->second.peekCb != nullptr) {
VLOG(10) << "Not closing stream=" << *itr
<< " because it has active peek callback";
++itr;
continue;
}
// If we have pending byte events, delay closing the stream
auto numByteEventCb = getNumByteEventCallbacksForStream(*itr);
if (numByteEventCb > 0) {
VLOG(10) << "Not closing stream=" << *itr << " because it has "
<< numByteEventCb << " pending byte event callbacks";
++itr;
continue;
}
VLOG(10) << "Closing stream=" << *itr;
if (conn_->qLogger) {
conn_->qLogger->addTransportStateUpdate(
getClosingStream(folly::to<std::string>(*itr)));
}
if (connCallback_) {
connCallback_->onStreamPreReaped(*itr);
}
auto result = conn_->streamManager->removeClosedStream(*itr);
if (result.hasError()) {
exceptionCloseWhat_ = result.error().message;
closeImpl(QuicError(
result.error().code, std::string("checkForClosedStream() error")));
return;
}
maybeSendStreamLimitUpdates(*conn_);
if (readCbIt != readCallbacks_.end()) {
readCallbacks_.erase(readCbIt);
}
if (peekCbIt != peekCallbacks_.end()) {
peekCallbacks_.erase(peekCbIt);
}
itr = conn_->streamManager->closedStreams().erase(itr);
} // while
if (closeState_ == CloseState::GRACEFUL_CLOSING &&
conn_->streamManager->streamCount() == 0) {
closeImpl(none);
}
}
void QuicTransportBaseLite::writeSocketDataAndCatch() {
[[maybe_unused]] auto self = sharedGuard();
try {
auto result = writeSocketData();
if (result.hasError()) {
VLOG(4) << __func__ << " " << result.error().message << " " << *this;
exceptionCloseWhat_ = result.error().message;
closeImpl(result.error());
return;
}
processCallbacksAfterWriteData();
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()),
std::string("writeSocketDataAndCatch() error")));
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()),
std::string("writeSocketDataAndCatch() error")));
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " error=" << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("writeSocketDataAndCatch() error")));
}
}
void QuicTransportBaseLite::pacedWriteDataToSocket() {
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
self->maybeStopWriteLooperAndArmSocketWritableEvent();
};
if (!isConnectionPaced(*conn_)) {
// Not paced and connection is still open, normal write. Even if pacing is
// previously enabled and then gets disabled, and we are here due to a
// timeout, we should do a normal write to flush out the residue from
// pacing write.
writeSocketDataAndCatch();
if (conn_->transportSettings.scheduleTimerForExcessWrites) {
// If we still have data to write, yield the event loop now but schedule a
// timeout to come around and write again as soon as possible.
auto writeDataReason = shouldWriteData(*conn_);
if (writeDataReason != WriteDataReason::NO_WRITE &&
!excessWriteTimeout_.isTimerCallbackScheduled()) {
scheduleTimeout(&excessWriteTimeout_, 0ms);
}
}
return;
}
// We are in the middle of a pacing interval. Leave it be.
if (writeLooper_->isPacingScheduled()) {
// The next burst is already scheduled. Since the burst size doesn't
// depend on much data we currently have in buffer at all, no need to
// change anything.
return;
}
// Do a burst write before waiting for an interval. This will also call
// updateWriteLooper, but inside FunctionLooper we will ignore that.
writeSocketDataAndCatch();
}
folly::Expected<folly::Unit, QuicError>
QuicTransportBaseLite::writeSocketData() {
if (socket_) {
++(conn_->writeCount); // incremented on each write (or write attempt)
// record current number of sent packets to detect delta
const auto beforeTotalBytesSent = conn_->lossState.totalBytesSent;
const auto beforeTotalPacketsSent = conn_->lossState.totalPacketsSent;
const auto beforeTotalAckElicitingPacketsSent =
conn_->lossState.totalAckElicitingPacketsSent;
const auto beforeNumOutstandingPackets =
conn_->outstandings.numOutstanding();
updatePacketProcessorsPrewriteRequests();
// if we're starting to write from app limited, notify observers
if (conn_->appLimitedTracker.isAppLimited() &&
conn_->congestionController) {
conn_->appLimitedTracker.setNotAppLimited();
notifyStartWritingFromAppRateLimited();
}
auto result = writeData();
if (result.hasError()) {
return result;
}
if (closeState_ != CloseState::CLOSED) {
if (conn_->pendingEvents.closeTransport == true) {
return folly::makeUnexpected(QuicError(
TransportErrorCode::PROTOCOL_VIOLATION,
"Max packet number reached"));
}
setLossDetectionAlarm(*conn_, *this);
// check for change in number of packets
const auto afterTotalBytesSent = conn_->lossState.totalBytesSent;
const auto afterTotalPacketsSent = conn_->lossState.totalPacketsSent;
const auto afterTotalAckElicitingPacketsSent =
conn_->lossState.totalAckElicitingPacketsSent;
const auto afterNumOutstandingPackets =
conn_->outstandings.numOutstanding();
CHECK_LE(beforeTotalPacketsSent, afterTotalPacketsSent);
CHECK_LE(
beforeTotalAckElicitingPacketsSent,
afterTotalAckElicitingPacketsSent);
CHECK_LE(beforeNumOutstandingPackets, afterNumOutstandingPackets);
CHECK_EQ(
afterNumOutstandingPackets - beforeNumOutstandingPackets,
afterTotalAckElicitingPacketsSent -
beforeTotalAckElicitingPacketsSent);
const bool newPackets = (afterTotalPacketsSent > beforeTotalPacketsSent);
const bool newOutstandingPackets =
(afterTotalAckElicitingPacketsSent >
beforeTotalAckElicitingPacketsSent);
// if packets sent, notify observers
if (newPackets) {
notifyPacketsWritten(
afterTotalPacketsSent - beforeTotalPacketsSent
/* numPacketsWritten */,
afterTotalAckElicitingPacketsSent -
beforeTotalAckElicitingPacketsSent
/* numAckElicitingPacketsWritten */,
afterTotalBytesSent - beforeTotalBytesSent /* numBytesWritten */);
}
if (conn_->loopDetectorCallback && newOutstandingPackets) {
conn_->writeDebugState.currentEmptyLoopCount = 0;
} else if (
conn_->writeDebugState.needsWriteLoopDetect &&
conn_->loopDetectorCallback) {
// TODO: Currently we will to get some stats first. Then we may filter
// out some errors here. For example, socket fail to write might be a
// legit case to filter out.
conn_->loopDetectorCallback->onSuspiciousWriteLoops(
++conn_->writeDebugState.currentEmptyLoopCount,
conn_->writeDebugState.writeDataReason,
conn_->writeDebugState.noWriteReason,
conn_->writeDebugState.schedulerName);
}
// If we sent a new packet and the new packet was either the first
// packet after quiescence or after receiving a new packet.
if (newOutstandingPackets &&
(beforeNumOutstandingPackets == 0 ||
conn_->receivedNewPacketBeforeWrite)) {
// Reset the idle timer because we sent some data.
setIdleTimer();
conn_->receivedNewPacketBeforeWrite = false;
}
// Check if we are app-limited after finish this round of sending
auto currentSendBufLen = conn_->flowControlState.sumCurStreamBufferLen;
auto lossBufferEmpty = !conn_->streamManager->hasLoss() &&
conn_->cryptoState->initialStream.lossBuffer.empty() &&
conn_->cryptoState->handshakeStream.lossBuffer.empty() &&
conn_->cryptoState->oneRttStream.lossBuffer.empty();
if (conn_->congestionController &&
currentSendBufLen < conn_->udpSendPacketLen && lossBufferEmpty &&
conn_->congestionController->getWritableBytes()) {
conn_->congestionController->setAppLimited();
// notify via connection call and any observer callbacks
if (transportReadyNotified_ && connCallback_) {
connCallback_->onAppRateLimited();
}
conn_->appLimitedTracker.setAppLimited();
notifyAppRateLimited();
}
}
}
// Writing data could write out an ack which could cause us to cancel
// the ack timer. But we need to call scheduleAckTimeout() for it to take
// effect.
scheduleAckTimeout();
schedulePathValidationTimeout();
updateWriteLooper(false);
return folly::unit;
}
// TODO: t64691045 change the closeImpl API to include both the sanitized and
// unsanited error message, remove exceptionCloseWhat_.
void QuicTransportBaseLite::closeImpl(
Optional<QuicError> errorCode,
bool drainConnection,
bool sendCloseImmediately) {
if (closeState_ == CloseState::CLOSED) {
return;
}
if (getSocketObserverContainer()) {
SocketObserverInterface::CloseStartedEvent event;
event.maybeCloseReason = errorCode;
getSocketObserverContainer()->invokeInterfaceMethodAllObservers(
[&event](auto observer, auto observed) {
observer->closeStarted(observed, event);
});
}
drainConnection = drainConnection & conn_->transportSettings.shouldDrain;
uint64_t totalCryptoDataWritten = 0;
uint64_t totalCryptoDataRecvd = 0;
auto timeUntilLastInitialCryptoFrameReceived = std::chrono::milliseconds(0);
if (conn_->cryptoState) {
totalCryptoDataWritten +=
conn_->cryptoState->initialStream.currentWriteOffset;
totalCryptoDataWritten +=
conn_->cryptoState->handshakeStream.currentWriteOffset;
totalCryptoDataWritten +=
conn_->cryptoState->oneRttStream.currentWriteOffset;
totalCryptoDataRecvd += conn_->cryptoState->initialStream.maxOffsetObserved;
totalCryptoDataRecvd +=
conn_->cryptoState->handshakeStream.maxOffsetObserved;
totalCryptoDataRecvd += conn_->cryptoState->oneRttStream.maxOffsetObserved;
timeUntilLastInitialCryptoFrameReceived =
std::chrono::duration_cast<std::chrono::milliseconds>(
conn_->cryptoState->lastInitialCryptoFrameReceivedTimePoint -
conn_->connectionTime);
}
if (conn_->qLogger) {
auto tlsSummary = conn_->handshakeLayer->getTLSSummary();
conn_->qLogger->addTransportSummary(
{conn_->lossState.totalBytesSent,
conn_->lossState.totalBytesRecvd,
conn_->flowControlState.sumCurWriteOffset,
conn_->flowControlState.sumMaxObservedOffset,
conn_->flowControlState.sumCurStreamBufferLen,
conn_->lossState.totalBytesRetransmitted,
conn_->lossState.totalStreamBytesCloned,
conn_->lossState.totalBytesCloned,
totalCryptoDataWritten,
totalCryptoDataRecvd,
conn_->congestionController
? conn_->congestionController->getWritableBytes()
: std::numeric_limits<uint64_t>::max(),
getSendConnFlowControlBytesWire(*conn_),
conn_->lossState.totalPacketsSpuriouslyMarkedLost,
conn_->lossState.reorderingThreshold,
uint64_t(conn_->transportSettings.timeReorderingThreshDividend),
conn_->usedZeroRtt,
conn_->version.value_or(QuicVersion::MVFST_INVALID),
conn_->dsrPacketCount,
conn_->initialPacketsReceived,
conn_->uniqueInitialCryptoFramesReceived,
timeUntilLastInitialCryptoFrameReceived,
tlsSummary.alpn,
tlsSummary.namedGroup,
tlsSummary.pskType,
tlsSummary.echStatus});
}
// TODO: truncate the error code string to be 1MSS only.
closeState_ = CloseState::CLOSED;
updatePacingOnClose(*conn_);
auto cancelCode = QuicError(
QuicErrorCode(LocalErrorCode::NO_ERROR),
toString(LocalErrorCode::NO_ERROR).str());
if (conn_->peerConnectionError) {
cancelCode = *conn_->peerConnectionError;
} else if (errorCode) {
cancelCode = *errorCode;
}
// cancelCode is used for communicating error message to local app layer.
// errorCode will be used for localConnectionError, and sent in close frames.
// It's safe to include the unsanitized error message in cancelCode
if (exceptionCloseWhat_) {
cancelCode.message = exceptionCloseWhat_.value();
}
bool isReset = false;
bool isAbandon = false;
bool isInvalidMigration = false;
LocalErrorCode* localError = cancelCode.code.asLocalErrorCode();
TransportErrorCode* transportError = cancelCode.code.asTransportErrorCode();
if (localError) {
isReset = *localError == LocalErrorCode::CONNECTION_RESET;
isAbandon = *localError == LocalErrorCode::CONNECTION_ABANDONED;
}
isInvalidMigration = transportError &&
*transportError == TransportErrorCode::INVALID_MIGRATION;
VLOG_IF(4, isReset) << "Closing transport due to stateless reset " << *this;
VLOG_IF(4, isAbandon) << "Closing transport due to abandoned connection "
<< *this;
if (errorCode) {
conn_->localConnectionError = errorCode;
if (conn_->qLogger) {
conn_->qLogger->addConnectionClose(
conn_->localConnectionError->message,
errorCode->message,
drainConnection,
sendCloseImmediately);
}
} else if (conn_->qLogger) {
auto reason = folly::to<std::string>(
"Server: ",
kNoError,
", Peer: isReset: ",
isReset,
", Peer: isAbandon: ",
isAbandon);
conn_->qLogger->addConnectionClose(
kNoError, std::move(reason), drainConnection, sendCloseImmediately);
}
cancelLossTimeout();
cancelTimeout(&ackTimeout_);
cancelTimeout(&pathValidationTimeout_);
cancelTimeout(&idleTimeout_);
cancelTimeout(&keepaliveTimeout_);
cancelTimeout(&pingTimeout_);
cancelTimeout(&excessWriteTimeout_);
VLOG(10) << "Stopping read looper due to immediate close " << *this;
readLooper_->stop();
peekLooper_->stop();
writeLooper_->stop();
cancelAllAppCallbacks(cancelCode);
// Clear out all the pending events, we don't need them any more.
closeTransport();
// Clear out all the streams, we don't need them any more. When the peer
// receives the conn close they will implicitly reset all the streams.
conn_->streamManager->clearOpenStreams();
// Clear out all the buffered datagrams
conn_->datagramState.readBuffer.clear();
conn_->datagramState.writeBuffer.clear();
// Clear out all the pending events.
conn_->pendingEvents = QuicConnectionStateBase::PendingEvents();
conn_->streamManager->clearActionable();
conn_->streamManager->clearWritable();
if (conn_->ackStates.initialAckState) {
conn_->ackStates.initialAckState->acks.clear();
}
if (conn_->ackStates.handshakeAckState) {
conn_->ackStates.handshakeAckState->acks.clear();
}
conn_->ackStates.appDataAckState.acks.clear();
if (transportReadyNotified_) {
// This connection was open, update the stats for close.
QUIC_STATS(conn_->statsCallback, onConnectionClose, cancelCode.code);
processConnectionCallbacks(std::move(cancelCode));
} else {
processConnectionSetupCallbacks(std::move(cancelCode));
}
// can't invoke connection callbacks any more.
resetConnectionCallbacks();
// Don't need outstanding packets.
conn_->outstandings.reset();
// We don't need no congestion control.
conn_->congestionController = nullptr;
sendCloseImmediately = sendCloseImmediately && !isReset && !isAbandon;
if (sendCloseImmediately) {
// We might be invoked from the destructor, so just send the connection
// close directly.
auto result = writeData();
if (result.hasError()) {
LOG(ERROR) << "close failed with error: " << result.error().message << " "
<< *this;
}
}
drainConnection =
drainConnection && !isReset && !isAbandon && !isInvalidMigration;
if (drainConnection) {
// We ever drain once, and the object ever gets created once.
DCHECK(!isTimeoutScheduled(&drainTimeout_));
scheduleTimeout(
&drainTimeout_,
folly::chrono::ceil<std::chrono::milliseconds>(
kDrainFactor * calculatePTO(*conn_)));
} else {
drainTimeoutExpired();
}
}
void QuicTransportBaseLite::processCallbacksAfterNetworkData() {
if (closeState_ != CloseState::OPEN) {
return;
}
if (!connCallback_ || !conn_->streamManager) {
return;
}
// We reuse this storage for storing streams which need callbacks.
std::vector<StreamId> tempStorage;
handleNewStreamCallbacks(tempStorage);
if (closeState_ != CloseState::OPEN) {
return;
}
handleNewGroupedStreamCallbacks(tempStorage);
if (closeState_ != CloseState::OPEN) {
return;
}
handlePingCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
handleKnobCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
handleAckEventCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
handleCancelByteEventCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
handleDeliveryCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
handleStreamFlowControlUpdatedCallbacks(tempStorage);
if (closeState_ != CloseState::OPEN) {
return;
}
handleStreamStopSendingCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
handleConnWritable();
if (closeState_ != CloseState::OPEN) {
return;
}
invokeStreamsAvailableCallbacks();
cleanupAckEventState();
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::resetStreamInternal(
StreamId id,
ApplicationErrorCode errorCode,
bool reliable) {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
[[maybe_unused]] auto self = sharedGuard();
SCOPE_EXIT {
checkForClosedStream();
updateReadLooper();
updatePeekLooper();
updateWriteLooper(true);
};
try {
// Check whether stream exists before calling getStream to avoid
// creating a peer stream if it does not exist yet.
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id).value_or(nullptr);
CHECK(stream) << "Invalid stream in " << __func__ << ": " << id;
if (stream->appErrorCodeToPeer &&
*stream->appErrorCodeToPeer != errorCode) {
// We can't change the error code across resets for a stream
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
folly::Optional<uint64_t> maybeReliableSize = folly::none;
if (reliable) {
maybeReliableSize = stream->reliableResetCheckpoint;
}
if (stream->reliableSizeToPeer && maybeReliableSize &&
*maybeReliableSize > *stream->reliableSizeToPeer) {
// We can't increase the reliable size in a reset
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (maybeReliableSize && *maybeReliableSize > 0 &&
(stream->sendState == StreamSendState::ResetSent)) {
// We can't send a reliable reset with a non-zero reliable size if
// we've already sent a non-reliable reset
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
// Invoke state machine
auto result = sendRstSMHandler(*stream, errorCode, maybeReliableSize);
if (result.hasError()) {
VLOG(4) << __func__ << " streamId=" << id << " " << result.error().message
<< " " << *this;
exceptionCloseWhat_ = result.error().message;
closeImpl(
QuicError(result.error().code, std::string("resetStream() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
}
// Cancel all byte events for this stream which have offsets that don't
// need to be reliably delivered.
invokeForEachByteEventType(
([this, id, &maybeReliableSize](const ByteEvent::Type type) {
cancelByteEventCallbacksForStreamInternal(
type, id, [&maybeReliableSize](uint64_t offset) {
return !maybeReliableSize || offset >= *maybeReliableSize;
});
}));
pendingWriteCallbacks_.erase(id);
QUIC_STATS(conn_->statsCallback, onQuicStreamReset, errorCode);
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("resetStream() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()), std::string("resetStream() error")));
return folly::makeUnexpected(ex.errorCode());
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("resetStream() error")));
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
return folly::unit;
}
void QuicTransportBaseLite::cancelByteEventCallbacksForStreamInternal(
const ByteEvent::Type type,
const StreamId id,
const std::function<bool(uint64_t)>& offsetFilter) {
if (isReceivingStream(conn_->nodeType, id)) {
return;
}
auto& byteEventMap = getByteEventMap(type);
auto byteEventMapIt = byteEventMap.find(id);
if (byteEventMapIt == byteEventMap.end()) {
switch (type) {
case ByteEvent::Type::ACK:
conn_->streamManager->removeDeliverable(id);
break;
case ByteEvent::Type::TX:
conn_->streamManager->removeTx(id);
break;
}
return;
}
auto& streamByteEvents = byteEventMapIt->second;
// Callbacks are kept sorted by offset, so we can just walk the queue and
// invoke those with offset below provided offset.
while (!streamByteEvents.empty()) {
// decomposition not supported for xplat
const auto cbOffset = streamByteEvents.front().offset;
const auto callback = streamByteEvents.front().callback;
if (offsetFilter(cbOffset)) {
streamByteEvents.pop_front();
ByteEventCancellation cancellation{id, cbOffset, type};
callback->onByteEventCanceled(cancellation);
if (closeState_ != CloseState::OPEN) {
// socket got closed - we can't use streamByteEvents anymore,
// closeImpl should take care of cleaning up any remaining callbacks
return;
}
} else {
// Only larger or equal offsets left, exit the loop.
break;
}
}
// Clean up state for this stream if no callbacks left to invoke.
if (streamByteEvents.empty()) {
switch (type) {
case ByteEvent::Type::ACK:
conn_->streamManager->removeDeliverable(id);
break;
case ByteEvent::Type::TX:
conn_->streamManager->removeTx(id);
break;
}
// The callback could have changed the map so erase by id.
byteEventMap.erase(id);
}
}
void QuicTransportBaseLite::onSocketWritable() noexcept {
// Remove the writable callback.
socket_->pauseWrite();
// Try to write.
// If write fails again, pacedWriteDataToSocket() will re-arm the write event
// and stop the write looper.
writeLooper_->run(true /* thisIteration */);
}
void QuicTransportBaseLite::invokeStreamsAvailableCallbacks() {
if (conn_->streamManager->consumeMaxLocalBidirectionalStreamIdIncreased()) {
// check in case new streams were created in preceding callbacks
// and max is already reached
auto numOpenableStreams = getNumOpenableBidirectionalStreams();
if (numOpenableStreams > 0) {
connCallback_->onBidirectionalStreamsAvailable(numOpenableStreams);
}
}
if (conn_->streamManager->consumeMaxLocalUnidirectionalStreamIdIncreased()) {
// check in case new streams were created in preceding callbacks
// and max is already reached
auto numOpenableStreams = getNumOpenableUnidirectionalStreams();
if (numOpenableStreams > 0) {
connCallback_->onUnidirectionalStreamsAvailable(numOpenableStreams);
}
}
}
void QuicTransportBaseLite::handlePingCallbacks() {
if (conn_->pendingEvents.notifyPingReceived && pingCallback_ != nullptr) {
conn_->pendingEvents.notifyPingReceived = false;
if (pingCallback_ != nullptr) {
pingCallback_->onPing();
}
}
if (!conn_->pendingEvents.cancelPingTimeout) {
return; // nothing to cancel
}
if (!isTimeoutScheduled(&pingTimeout_)) {
// set cancelpingTimeOut to false, delayed acks
conn_->pendingEvents.cancelPingTimeout = false;
return; // nothing to do, as timeout has already fired
}
cancelTimeout(&pingTimeout_);
if (pingCallback_ != nullptr) {
pingCallback_->pingAcknowledged();
}
conn_->pendingEvents.cancelPingTimeout = false;
}
void QuicTransportBaseLite::handleKnobCallbacks() {
if (!conn_->transportSettings.advertisedKnobFrameSupport) {
VLOG(4) << "Received knob frames without advertising support";
conn_->pendingEvents.knobs.clear();
return;
}
for (auto& knobFrame : conn_->pendingEvents.knobs) {
if (knobFrame.knobSpace != kDefaultQuicTransportKnobSpace) {
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::knobFrameEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::knobFrameEvents>(
[event = quic::SocketObserverInterface::KnobFrameEvent(
Clock::now(), knobFrame)](auto observer, auto observed) {
observer->knobFrameReceived(observed, event);
});
}
connCallback_->onKnob(
knobFrame.knobSpace, knobFrame.id, std::move(knobFrame.blob));
} else {
// KnobId is ignored
onTransportKnobs(std::move(knobFrame.blob));
}
}
conn_->pendingEvents.knobs.clear();
}
void QuicTransportBaseLite::handleAckEventCallbacks() {
auto& lastProcessedAckEvents = conn_->lastProcessedAckEvents;
if (lastProcessedAckEvents.empty()) {
return; // nothing to do
}
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::acksProcessedEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::acksProcessedEvents>(
[event =
quic::SocketObserverInterface::AcksProcessedEvent::Builder()
.setAckEvents(lastProcessedAckEvents)
.build()](auto observer, auto observed) {
observer->acksProcessed(observed, event);
});
}
lastProcessedAckEvents.clear();
}
void QuicTransportBaseLite::handleCancelByteEventCallbacks() {
for (auto pendingResetIt = conn_->pendingEvents.resets.begin();
pendingResetIt != conn_->pendingEvents.resets.end();
pendingResetIt++) {
cancelByteEventCallbacksForStream(pendingResetIt->first);
if (closeState_ != CloseState::OPEN) {
return;
}
}
}
void QuicTransportBaseLite::handleNewStreamCallbacks(
std::vector<StreamId>& streamStorage) {
streamStorage = conn_->streamManager->consumeNewPeerStreams();
handleNewStreams(streamStorage);
}
void QuicTransportBaseLite::handleNewGroupedStreamCallbacks(
std::vector<StreamId>& streamStorage) {
auto newStreamGroups = conn_->streamManager->consumeNewPeerStreamGroups();
for (auto newStreamGroupId : newStreamGroups) {
if (isBidirectionalStream(newStreamGroupId)) {
connCallback_->onNewBidirectionalStreamGroup(newStreamGroupId);
} else {
connCallback_->onNewUnidirectionalStreamGroup(newStreamGroupId);
}
}
streamStorage = conn_->streamManager->consumeNewGroupedPeerStreams();
handleNewGroupedStreams(streamStorage);
}
void QuicTransportBaseLite::handleDeliveryCallbacks() {
auto deliverableStreamId = conn_->streamManager->popDeliverable();
while (deliverableStreamId.has_value()) {
auto streamId = *deliverableStreamId;
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream);
if (maxOffsetToDeliver.has_value()) {
size_t amountTrimmed = stream->writeBuffer.trimStartAtMost(
*maxOffsetToDeliver - stream->writeBufferStartOffset);
stream->writeBufferStartOffset += amountTrimmed;
}
if (maxOffsetToDeliver.has_value()) {
while (hasDeliveryCallbacksToCall(streamId, *maxOffsetToDeliver)) {
auto& deliveryCallbacksForAckedStream = deliveryCallbacks_.at(streamId);
auto deliveryCallbackAndOffset =
deliveryCallbacksForAckedStream.front();
deliveryCallbacksForAckedStream.pop_front();
auto currentDeliveryCallbackOffset = deliveryCallbackAndOffset.offset;
auto deliveryCallback = deliveryCallbackAndOffset.callback;
ByteEvent byteEvent{
streamId,
currentDeliveryCallbackOffset,
ByteEvent::Type::ACK,
conn_->lossState.srtt};
deliveryCallback->onByteEvent(byteEvent);
if (closeState_ != CloseState::OPEN) {
return;
}
}
}
auto deliveryCallbacksForAckedStream = deliveryCallbacks_.find(streamId);
if (deliveryCallbacksForAckedStream != deliveryCallbacks_.end() &&
deliveryCallbacksForAckedStream->second.empty()) {
deliveryCallbacks_.erase(deliveryCallbacksForAckedStream);
}
deliverableStreamId = conn_->streamManager->popDeliverable();
}
}
void QuicTransportBaseLite::handleStreamFlowControlUpdatedCallbacks(
std::vector<StreamId>& streamStorage) {
// Iterate over streams that changed their flow control window and give
// their registered listeners their updates.
// We don't really need flow control notifications when we are closed.
streamStorage = conn_->streamManager->consumeFlowControlUpdated();
const auto& flowControlUpdated = streamStorage;
for (auto streamId : flowControlUpdated) {
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
if (!stream->writable()) {
pendingWriteCallbacks_.erase(streamId);
continue;
}
connCallback_->onFlowControlUpdate(streamId);
if (closeState_ != CloseState::OPEN) {
return;
}
// In case the callback modified the stream map, get it again.
stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
auto maxStreamWritable = maxWritableOnStream(*stream);
if (maxStreamWritable != 0 && !pendingWriteCallbacks_.empty()) {
auto pendingWriteIt = pendingWriteCallbacks_.find(stream->id);
if (pendingWriteIt != pendingWriteCallbacks_.end()) {
auto wcb = pendingWriteIt->second;
pendingWriteCallbacks_.erase(stream->id);
wcb->onStreamWriteReady(stream->id, maxStreamWritable);
if (closeState_ != CloseState::OPEN) {
return;
}
}
}
}
streamStorage.clear();
}
void QuicTransportBaseLite::handleStreamStopSendingCallbacks() {
const auto stopSendingStreamsCopy =
conn_->streamManager->consumeStopSending();
for (const auto& itr : stopSendingStreamsCopy) {
connCallback_->onStopSending(itr.first, itr.second);
if (closeState_ != CloseState::OPEN) {
return;
}
}
}
void QuicTransportBaseLite::handleConnWritable() {
auto maxConnWrite = maxWritableOnConn();
if (maxConnWrite != 0) {
// If the connection now has flow control, we may either have been blocked
// before on a pending write to the conn, or a stream's write.
if (connWriteCallback_) {
auto connWriteCallback = connWriteCallback_;
connWriteCallback_ = nullptr;
connWriteCallback->onConnectionWriteReady(maxConnWrite);
}
// If the connection flow control is unblocked, we might be unblocked
// on the streams now.
auto writeCallbackIt = pendingWriteCallbacks_.begin();
while (writeCallbackIt != pendingWriteCallbacks_.end()) {
auto streamId = writeCallbackIt->first;
auto wcb = writeCallbackIt->second;
++writeCallbackIt;
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
if (!stream->writable()) {
pendingWriteCallbacks_.erase(streamId);
continue;
}
auto maxStreamWritable = maxWritableOnStream(*stream);
if (maxStreamWritable != 0) {
pendingWriteCallbacks_.erase(streamId);
wcb->onStreamWriteReady(streamId, maxStreamWritable);
if (closeState_ != CloseState::OPEN) {
return;
}
}
}
}
}
void QuicTransportBaseLite::cleanupAckEventState() {
// if there's no bytes in flight, clear any memory allocated for AckEvents
if (conn_->outstandings.packets.empty()) {
std::vector<AckEvent> empty;
conn_->lastProcessedAckEvents.swap(empty);
} // memory allocated for vector will be freed
}
folly::Expected<WriteQuicDataResult, QuicError>
QuicTransportBaseLite::handleInitialWriteDataCommon(
const ConnectionId& srcConnId,
const ConnectionId& dstConnId,
uint64_t packetLimit,
const std::string& token) {
CHECK(conn_->initialWriteCipher);
auto version = conn_->version.value_or(*(conn_->originalVersion));
auto& initialCryptoStream =
*getCryptoStream(*conn_->cryptoState, EncryptionLevel::Initial);
CryptoStreamScheduler initialScheduler(*conn_, initialCryptoStream);
auto& numProbePackets =
conn_->pendingEvents.numProbePackets[PacketNumberSpace::Initial];
if ((initialCryptoStream.retransmissionBuffer.size() &&
conn_->outstandings.packetCount[PacketNumberSpace::Initial] &&
numProbePackets) ||
initialScheduler.hasData() || toWriteInitialAcks(*conn_) ||
hasBufferedDataToWrite(*conn_)) {
CHECK(conn_->initialHeaderCipher);
return writeCryptoAndAckDataToSocket(
*socket_,
*conn_,
srcConnId /* src */,
dstConnId /* dst */,
LongHeader::Types::Initial,
*conn_->initialWriteCipher,
*conn_->initialHeaderCipher,
version,
packetLimit,
token);
}
return WriteQuicDataResult{};
}
folly::Expected<WriteQuicDataResult, QuicError>
QuicTransportBaseLite::handleHandshakeWriteDataCommon(
const ConnectionId& srcConnId,
const ConnectionId& dstConnId,
uint64_t packetLimit) {
auto version = conn_->version.value_or(*(conn_->originalVersion));
CHECK(conn_->handshakeWriteCipher);
auto& handshakeCryptoStream =
*getCryptoStream(*conn_->cryptoState, EncryptionLevel::Handshake);
CryptoStreamScheduler handshakeScheduler(*conn_, handshakeCryptoStream);
auto& numProbePackets =
conn_->pendingEvents.numProbePackets[PacketNumberSpace::Handshake];
if ((conn_->outstandings.packetCount[PacketNumberSpace::Handshake] &&
handshakeCryptoStream.retransmissionBuffer.size() && numProbePackets) ||
handshakeScheduler.hasData() || toWriteHandshakeAcks(*conn_) ||
hasBufferedDataToWrite(*conn_)) {
CHECK(conn_->handshakeWriteHeaderCipher);
return writeCryptoAndAckDataToSocket(
*socket_,
*conn_,
srcConnId /* src */,
dstConnId /* dst */,
LongHeader::Types::Handshake,
*conn_->handshakeWriteCipher,
*conn_->handshakeWriteHeaderCipher,
version,
packetLimit);
}
return WriteQuicDataResult{};
}
void QuicTransportBaseLite::closeUdpSocket() {
if (!socket_) {
return;
}
if (getSocketObserverContainer()) {
SocketObserverInterface::ClosingEvent event; // empty for now
getSocketObserverContainer()->invokeInterfaceMethodAllObservers(
[&event](auto observer, auto observed) {
observer->closing(observed, event);
});
}
auto sock = std::move(socket_);
socket_ = nullptr;
sock->pauseRead();
sock->close();
}
folly::Expected<StreamId, LocalErrorCode>
QuicTransportBaseLite::createStreamInternal(
bool bidirectional,
const OptionalIntegral<StreamGroupId>& streamGroupId) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
folly::Expected<QuicStreamState*, LocalErrorCode> streamResult;
if (bidirectional) {
streamResult =
conn_->streamManager->createNextBidirectionalStream(streamGroupId);
} else {
streamResult =
conn_->streamManager->createNextUnidirectionalStream(streamGroupId);
}
if (streamResult) {
const StreamId streamId = streamResult.value()->id;
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::streamEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::streamEvents>(
[event = SocketObserverInterface::StreamOpenEvent(
streamId,
getStreamInitiator(streamId),
getStreamDirectionality(streamId))](
auto observer, auto observed) {
observer->streamOpened(observed, event);
});
}
return streamId;
} else {
return folly::makeUnexpected(streamResult.error());
}
}
void QuicTransportBaseLite::cancelTimeout(QuicTimerCallback* callback) {
callback->cancelTimerCallback();
}
void QuicTransportBaseLite::excessWriteTimeoutExpired() noexcept {
auto writeDataReason = shouldWriteData(*conn_);
if (writeDataReason != WriteDataReason::NO_WRITE) {
pacedWriteDataToSocket();
}
}
void QuicTransportBaseLite::lossTimeoutExpired() noexcept {
CHECK_NE(closeState_, CloseState::CLOSED);
// onLossDetectionAlarm will set packetToSend in pending events
[[maybe_unused]] auto self = sharedGuard();
try {
auto result = onLossDetectionAlarm(*conn_, markPacketLoss);
if (result.hasError()) {
closeImpl(QuicError(
result.error().code, std::string("lossTimeoutExpired() error")));
return;
}
if (conn_->qLogger) {
conn_->qLogger->addTransportStateUpdate(kLossTimeoutExpired);
}
pacedWriteDataToSocket();
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()),
std::string("lossTimeoutExpired() error")));
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(ex.errorCode()),
std::string("lossTimeoutExpired() error")));
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
exceptionCloseWhat_ = ex.what();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("lossTimeoutExpired() error")));
}
}
void QuicTransportBaseLite::idleTimeoutExpired(bool drain) noexcept {
VLOG(4) << __func__ << " " << *this;
[[maybe_unused]] auto self = sharedGuard();
// idle timeout is expired, just close the connection and drain or
// send connection close immediately depending on 'drain'
DCHECK_NE(closeState_, CloseState::CLOSED);
uint64_t numOpenStreans = conn_->streamManager->streamCount();
auto localError =
drain ? LocalErrorCode::IDLE_TIMEOUT : LocalErrorCode::SHUTTING_DOWN;
closeImpl(
quic::QuicError(
QuicErrorCode(localError),
folly::to<std::string>(
toString(localError),
", num non control streams: ",
numOpenStreans - conn_->streamManager->numControlStreams())),
drain /* drainConnection */,
!drain /* sendCloseImmediately */);
}
void QuicTransportBaseLite::keepaliveTimeoutExpired() noexcept {
[[maybe_unused]] auto self = sharedGuard();
conn_->pendingEvents.sendPing = true;
updateWriteLooper(true);
}
void QuicTransportBaseLite::ackTimeoutExpired() noexcept {
CHECK_NE(closeState_, CloseState::CLOSED);
VLOG(10) << __func__ << " " << *this;
[[maybe_unused]] auto self = sharedGuard();
updateAckStateOnAckTimeout(*conn_);
pacedWriteDataToSocket();
}
void QuicTransportBaseLite::pathValidationTimeoutExpired() noexcept {
CHECK(conn_->outstandingPathValidation);
conn_->pendingEvents.schedulePathValidationTimeout = false;
conn_->outstandingPathValidation.reset();
if (conn_->qLogger) {
conn_->qLogger->addPathValidationEvent(false);
}
// TODO junqiw probing is not supported, so pathValidation==connMigration
// We decide to close conn when pathValidation to migrated path fails.
[[maybe_unused]] auto self = sharedGuard();
closeImpl(QuicError(
QuicErrorCode(TransportErrorCode::INVALID_MIGRATION),
std::string("Path validation timed out")));
}
void QuicTransportBaseLite::drainTimeoutExpired() noexcept {
closeUdpSocket();
unbindConnection();
}
void QuicTransportBaseLite::pingTimeoutExpired() noexcept {
// If timeout expired just call the call back Provided
if (pingCallback_ != nullptr) {
pingCallback_->pingTimeout();
}
}
bool QuicTransportBaseLite::processCancelCode(const QuicError& cancelCode) {
bool noError = false;
switch (cancelCode.code.type()) {
case QuicErrorCode::Type::LocalErrorCode: {
LocalErrorCode localErrorCode = *cancelCode.code.asLocalErrorCode();
noError = localErrorCode == LocalErrorCode::NO_ERROR ||
localErrorCode == LocalErrorCode::IDLE_TIMEOUT ||
localErrorCode == LocalErrorCode::SHUTTING_DOWN;
break;
}
case QuicErrorCode::Type::TransportErrorCode: {
TransportErrorCode transportErrorCode =
*cancelCode.code.asTransportErrorCode();
noError = transportErrorCode == TransportErrorCode::NO_ERROR;
break;
}
case QuicErrorCode::Type::ApplicationErrorCode:
auto appErrorCode = *cancelCode.code.asApplicationErrorCode();
noError = appErrorCode == APP_NO_ERROR;
}
return noError;
}
uint64_t QuicTransportBaseLite::maxWritableOnConn() const {
auto connWritableBytes = getSendConnFlowControlBytesAPI(*conn_);
auto availableBufferSpace = bufferSpaceAvailable();
uint64_t ret = std::min(connWritableBytes, availableBufferSpace);
uint8_t multiplier = conn_->transportSettings.backpressureHeadroomFactor;
if (multiplier > 0) {
auto headRoom = multiplier * congestionControlWritableBytes(*conn_);
auto bufferLen = conn_->flowControlState.sumCurStreamBufferLen;
headRoom -= bufferLen > headRoom ? headRoom : bufferLen;
ret = std::min(ret, headRoom);
}
return ret;
}
void QuicTransportBaseLite::cancelAllAppCallbacks(
const QuicError& err) noexcept {
SCOPE_EXIT {
checkForClosedStream();
updateReadLooper();
updatePeekLooper();
updateWriteLooper(true);
};
conn_->streamManager->clearActionable();
// Cancel any pending ByteEvent callbacks
cancelAllByteEventCallbacks();
// TODO: this will become simpler when we change the underlying data
// structure of read callbacks.
// TODO: this approach will make the app unable to setReadCallback to
// nullptr during the loop. Need to fix that.
auto readCallbacksCopy = readCallbacks_;
for (auto& cb : readCallbacksCopy) {
auto streamId = cb.first;
auto it = readCallbacks_.find(streamId);
if (it == readCallbacks_.end()) {
// An earlier call to readError removed the stream from readCallbacks
// May not be possible?
continue;
}
if (it->second.readCb) {
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
if (!stream->groupId) {
it->second.readCb->readError(streamId, err);
} else {
it->second.readCb->readErrorWithGroup(streamId, *stream->groupId, err);
}
}
readCallbacks_.erase(it);
}
// TODO: what if a call to readError installs a new read callback?
LOG_IF(ERROR, !readCallbacks_.empty())
<< readCallbacks_.size() << " read callbacks remaining to be cleared";
VLOG(4) << "Clearing datagram callback";
datagramCallback_ = nullptr;
VLOG(4) << "Clearing ping callback";
pingCallback_ = nullptr;
VLOG(4) << "Clearing " << peekCallbacks_.size() << " peek callbacks";
auto peekCallbacksCopy = peekCallbacks_;
for (auto& cb : peekCallbacksCopy) {
peekCallbacks_.erase(cb.first);
if (cb.second.peekCb) {
cb.second.peekCb->peekError(cb.first, err);
}
}
if (connWriteCallback_) {
auto connWriteCallback = connWriteCallback_;
connWriteCallback_ = nullptr;
connWriteCallback->onConnectionWriteError(err);
}
auto pendingWriteCallbacksCopy = pendingWriteCallbacks_;
for (auto& wcb : pendingWriteCallbacksCopy) {
pendingWriteCallbacks_.erase(wcb.first);
wcb.second->onStreamWriteError(wcb.first, err);
}
}
void QuicTransportBaseLite::scheduleTimeout(
QuicTimerCallback* callback,
std::chrono::milliseconds timeout) {
if (evb_) {
evb_->scheduleTimeout(callback, timeout);
}
}
void QuicTransportBaseLite::scheduleLossTimeout(
std::chrono::milliseconds timeout) {
if (closeState_ == CloseState::CLOSED) {
return;
}
timeout = timeMax(timeout, evb_->getTimerTickInterval());
scheduleTimeout(&lossTimeout_, timeout);
}
void QuicTransportBaseLite::cancelLossTimeout() {
cancelTimeout(&lossTimeout_);
}
bool QuicTransportBaseLite::isLossTimeoutScheduled() {
return isTimeoutScheduled(&lossTimeout_);
}
size_t QuicTransportBaseLite::getNumByteEventCallbacksForStream(
const StreamId id) const {
size_t total = 0;
invokeForEachByteEventTypeConst(
([this, id, &total](const ByteEvent::Type type) {
total += getNumByteEventCallbacksForStream(type, id);
}));
return total;
}
size_t QuicTransportBaseLite::getNumByteEventCallbacksForStream(
const ByteEvent::Type type,
const StreamId id) const {
const auto& byteEventMap = getByteEventMapConst(type);
const auto byteEventMapIt = byteEventMap.find(id);
if (byteEventMapIt == byteEventMap.end()) {
return 0;
}
const auto& streamByteEvents = byteEventMapIt->second;
return streamByteEvents.size();
}
void QuicTransportBaseLite::cancelAllByteEventCallbacks() {
invokeForEachByteEventType(
([this](const ByteEvent::Type type) { cancelByteEventCallbacks(type); }));
}
void QuicTransportBaseLite::cancelByteEventCallbacks(
const ByteEvent::Type type) {
ByteEventMap byteEventMap = std::move(getByteEventMap(type));
for (const auto& [streamId, cbMap] : byteEventMap) {
for (const auto& [offset, cb] : cbMap) {
ByteEventCancellation cancellation{streamId, offset, type};
cb->onByteEventCanceled(cancellation);
}
}
}
StreamInitiator QuicTransportBaseLite::getStreamInitiator(
StreamId stream) noexcept {
return quic::getStreamInitiator(conn_->nodeType, stream);
}
QuicConnectionStats QuicTransportBaseLite::getConnectionsStats() const {
QuicConnectionStats connStats;
if (!conn_) {
return connStats;
}
connStats.peerAddress = conn_->peerAddress;
connStats.duration = Clock::now() - conn_->connectionTime;
if (conn_->congestionController) {
connStats.cwnd_bytes = conn_->congestionController->getCongestionWindow();
connStats.congestionController = conn_->congestionController->type();
conn_->congestionController->getStats(connStats.congestionControllerStats);
}
connStats.ptoCount = conn_->lossState.ptoCount;
connStats.srtt = conn_->lossState.srtt;
connStats.mrtt = conn_->lossState.mrtt;
connStats.lrtt = conn_->lossState.lrtt;
connStats.rttvar = conn_->lossState.rttvar;
connStats.peerAckDelayExponent = conn_->peerAckDelayExponent;
connStats.udpSendPacketLen = conn_->udpSendPacketLen;
if (conn_->streamManager) {
connStats.numStreams = conn_->streamManager->streams().size();
}
if (conn_->clientChosenDestConnectionId.hasValue()) {
connStats.clientChosenDestConnectionId =
conn_->clientChosenDestConnectionId->hex();
}
if (conn_->clientConnectionId.hasValue()) {
connStats.clientConnectionId = conn_->clientConnectionId->hex();
}
if (conn_->serverConnectionId.hasValue()) {
connStats.serverConnectionId = conn_->serverConnectionId->hex();
}
connStats.totalBytesSent = conn_->lossState.totalBytesSent;
connStats.totalBytesReceived = conn_->lossState.totalBytesRecvd;
connStats.totalBytesRetransmitted = conn_->lossState.totalBytesRetransmitted;
if (conn_->version.hasValue()) {
connStats.version = static_cast<uint32_t>(*conn_->version);
}
return connStats;
}
const TransportSettings& QuicTransportBaseLite::getTransportSettings() const {
return conn_->transportSettings;
}
bool QuicTransportBaseLite::isTimeoutScheduled(
QuicTimerCallback* callback) const {
return callback->isTimerCallbackScheduled();
}
void QuicTransportBaseLite::invokeReadDataAndCallbacks(
bool updateLoopersAndCheckForClosedStream) {
auto self = sharedGuard();
SCOPE_EXIT {
if (updateLoopersAndCheckForClosedStream) {
self->checkForClosedStream();
self->updateReadLooper();
self->updateWriteLooper(true);
}
};
// Need a copy since the set can change during callbacks.
std::vector<StreamId> readableStreamsCopy;
const auto& readableStreams = self->conn_->streamManager->readableStreams();
const auto& readableUnidirectionalStreams =
self->conn_->streamManager->readableUnidirectionalStreams();
readableStreamsCopy.reserve(
readableStreams.size() + readableUnidirectionalStreams.size());
if (self->conn_->transportSettings.unidirectionalStreamsReadCallbacksFirst) {
std::copy(
readableUnidirectionalStreams.begin(),
readableUnidirectionalStreams.end(),
std::back_inserter(readableStreamsCopy));
}
std::copy(
readableStreams.begin(),
readableStreams.end(),
std::back_inserter(readableStreamsCopy));
if (self->conn_->transportSettings.orderedReadCallbacks) {
std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end());
}
for (StreamId streamId : readableStreamsCopy) {
auto callback = self->readCallbacks_.find(streamId);
if (callback == self->readCallbacks_.end()) {
// Stream doesn't have a read callback set, skip it.
continue;
}
auto readCb = callback->second.readCb;
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
if (readCb && stream->streamReadError &&
(!stream->reliableSizeFromPeer ||
*stream->reliableSizeFromPeer <= stream->currentReadOffset)) {
// If we got a reliable reset from the peer, we don't fire the readError
// callback and remove it until we've read all of the reliable data.
if (self->conn_->transportSettings
.unidirectionalStreamsReadCallbacksFirst &&
isUnidirectionalStream(streamId)) {
self->conn_->streamManager->readableUnidirectionalStreams().erase(
streamId);
} else {
self->conn_->streamManager->readableStreams().erase(streamId);
}
readCallbacks_.erase(callback);
// if there is an error on the stream - it's not readable anymore, so
// we cannot peek into it as well.
self->conn_->streamManager->peekableStreams().erase(streamId);
peekCallbacks_.erase(streamId);
VLOG(10) << "invoking read error callbacks on stream=" << streamId << " "
<< *this;
if (!stream->groupId) {
readCb->readError(streamId, QuicError(*stream->streamReadError));
} else {
readCb->readErrorWithGroup(
streamId, *stream->groupId, QuicError(*stream->streamReadError));
}
} else if (
readCb && callback->second.resumed && stream->hasReadableData()) {
VLOG(10) << "invoking read callbacks on stream=" << streamId << " "
<< *this;
if (!stream->groupId) {
readCb->readAvailable(streamId);
} else {
readCb->readAvailableWithGroup(streamId, *stream->groupId);
}
}
}
if (self->datagramCallback_ && !conn_->datagramState.readBuffer.empty()) {
self->datagramCallback_->onDatagramsAvailable();
}
}
void QuicTransportBaseLite::invokePeekDataAndCallbacks() {
auto self = sharedGuard();
SCOPE_EXIT {
self->checkForClosedStream();
self->updatePeekLooper();
self->updateWriteLooper(true);
};
// TODO: add protection from calling "consume" in the middle of the peek -
// one way is to have a peek counter that is incremented when peek calblack
// is called and decremented when peek is done. once counter transitions
// to 0 we can execute "consume" calls that were done during "peek", for that,
// we would need to keep stack of them.
std::vector<StreamId> peekableStreamsCopy;
const auto& peekableStreams = self->conn_->streamManager->peekableStreams();
peekableStreamsCopy.reserve(peekableStreams.size());
std::copy(
peekableStreams.begin(),
peekableStreams.end(),
std::back_inserter(peekableStreamsCopy));
VLOG(10) << __func__
<< " peekableListCopy.size()=" << peekableStreamsCopy.size();
for (StreamId streamId : peekableStreamsCopy) {
auto callback = self->peekCallbacks_.find(streamId);
// This is a likely bug. Need to think more on whether events can
// be dropped
// remove streamId from list of peekable - as opposed to "read", "peek" is
// only called once per streamId and not on every EVB loop until application
// reads the data.
self->conn_->streamManager->peekableStreams().erase(streamId);
if (callback == self->peekCallbacks_.end()) {
VLOG(10) << " No peek callback for stream=" << streamId;
continue;
}
auto peekCb = callback->second.peekCb;
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
if (peekCb && stream->streamReadError) {
VLOG(10) << "invoking peek error callbacks on stream=" << streamId << " "
<< *this;
peekCb->peekError(streamId, QuicError(*stream->streamReadError));
} else if (
peekCb && !stream->streamReadError && stream->hasPeekableData()) {
VLOG(10) << "invoking peek callbacks on stream=" << streamId << " "
<< *this;
peekDataFromQuicStream(
*stream,
[&](StreamId id, const folly::Range<PeekIterator>& peekRange) {
peekCb->onDataAvailable(id, peekRange);
});
} else {
VLOG(10) << "Not invoking peek callbacks on stream=" << streamId;
}
}
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBaseLite::setReadCallbackInternal(
StreamId id,
ReadCallback* cb,
Optional<ApplicationErrorCode> err) noexcept {
VLOG(4) << "Setting setReadCallback for stream=" << id << " cb=" << cb << " "
<< *this;
auto readCbIt = readCallbacks_.find(id);
if (readCbIt == readCallbacks_.end()) {
// Don't allow initial setting of a nullptr callback.
if (!cb) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
readCbIt = readCallbacks_.emplace(id, ReadCallbackData(cb)).first;
}
auto& readCb = readCbIt->second.readCb;
if (readCb == nullptr && cb != nullptr) {
// It's already been set to nullptr we do not allow unsetting it.
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
} else {
readCb = cb;
if (readCb == nullptr && err) {
return stopSending(id, err.value());
}
}
updateReadLooper();
return folly::unit;
}
Optional<folly::SocketCmsgMap>
QuicTransportBaseLite::getAdditionalCmsgsForAsyncUDPSocket() {
if (conn_->socketCmsgsState.additionalCmsgs) {
// This callback should be happening for the target write
DCHECK(conn_->writeCount == conn_->socketCmsgsState.targetWriteCount);
return conn_->socketCmsgsState.additionalCmsgs;
}
return none;
}
void QuicTransportBaseLite::notifyStartWritingFromAppRateLimited() {
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::appRateLimitedEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::appRateLimitedEvents>(
[event =
SocketObserverInterface::AppLimitedEvent::Builder()
.setOutstandingPackets(conn_->outstandings.packets)
.setWriteCount(conn_->writeCount)
.setLastPacketSentTime(
conn_->lossState.maybeLastPacketSentTime)
.setCwndInBytes(
conn_->congestionController
? Optional<uint64_t>(conn_->congestionController
->getCongestionWindow())
: none)
.setWritableBytes(
conn_->congestionController
? Optional<uint64_t>(conn_->congestionController
->getWritableBytes())
: none)
.build()](auto observer, auto observed) {
observer->startWritingFromAppLimited(observed, event);
});
}
}
void QuicTransportBaseLite::notifyPacketsWritten(
const uint64_t numPacketsWritten,
const uint64_t numAckElicitingPacketsWritten,
const uint64_t numBytesWritten) {
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::packetsWrittenEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::packetsWrittenEvents>(
[event =
SocketObserverInterface::PacketsWrittenEvent::Builder()
.setOutstandingPackets(conn_->outstandings.packets)
.setWriteCount(conn_->writeCount)
.setLastPacketSentTime(
conn_->lossState.maybeLastPacketSentTime)
.setCwndInBytes(
conn_->congestionController
? Optional<uint64_t>(conn_->congestionController
->getCongestionWindow())
: none)
.setWritableBytes(
conn_->congestionController
? Optional<uint64_t>(conn_->congestionController
->getWritableBytes())
: none)
.setNumPacketsWritten(numPacketsWritten)
.setNumAckElicitingPacketsWritten(
numAckElicitingPacketsWritten)
.setNumBytesWritten(numBytesWritten)
.build()](auto observer, auto observed) {
observer->packetsWritten(observed, event);
});
}
}
void QuicTransportBaseLite::notifyAppRateLimited() {
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::appRateLimitedEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<
SocketObserverInterface::Events::appRateLimitedEvents>(
[event =
SocketObserverInterface::AppLimitedEvent::Builder()
.setOutstandingPackets(conn_->outstandings.packets)
.setWriteCount(conn_->writeCount)
.setLastPacketSentTime(
conn_->lossState.maybeLastPacketSentTime)
.setCwndInBytes(
conn_->congestionController
? Optional<uint64_t>(conn_->congestionController
->getCongestionWindow())
: none)
.setWritableBytes(
conn_->congestionController
? Optional<uint64_t>(conn_->congestionController
->getWritableBytes())
: none)
.build()](auto observer, auto observed) {
observer->appRateLimited(observed, event);
});
}
}
void QuicTransportBaseLite::onTransportKnobs(Buf knobBlob) {
// Not yet implemented,
VLOG(4) << "Received transport knobs: "
<< std::string(
reinterpret_cast<const char*>(knobBlob->data()),
knobBlob->length());
}
void QuicTransportBaseLite::processCallbacksAfterWriteData() {
if (closeState_ != CloseState::OPEN) {
return;
}
auto txStreamId = conn_->streamManager->popTx();
while (txStreamId.has_value()) {
auto streamId = *txStreamId;
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
auto largestOffsetTxed = getLargestWriteOffsetTxed(*stream);
// if it's in the set of streams with TX, we should have a valid offset
CHECK(largestOffsetTxed.has_value());
// lambda to help get the next callback to call for this stream
auto getNextTxCallbackForStreamAndCleanup =
[this, &largestOffsetTxed](
const auto& streamId) -> Optional<ByteEventDetail> {
auto txCallbacksForStreamIt = txCallbacks_.find(streamId);
if (txCallbacksForStreamIt == txCallbacks_.end() ||
txCallbacksForStreamIt->second.empty()) {
return none;
}
auto& txCallbacksForStream = txCallbacksForStreamIt->second;
if (txCallbacksForStream.front().offset > *largestOffsetTxed) {
return none;
}
// extract the callback, pop from the queue, then check for cleanup
auto result = txCallbacksForStream.front();
txCallbacksForStream.pop_front();
if (txCallbacksForStream.empty()) {
txCallbacks_.erase(txCallbacksForStreamIt);
}
return result;
};
Optional<ByteEventDetail> nextOffsetAndCallback;
while (
(nextOffsetAndCallback =
getNextTxCallbackForStreamAndCleanup(streamId))) {
ByteEvent byteEvent{
streamId, nextOffsetAndCallback->offset, ByteEvent::Type::TX};
nextOffsetAndCallback->callback->onByteEvent(byteEvent);
// connection may be closed by callback
if (closeState_ != CloseState::OPEN) {
return;
}
}
// pop the next stream
txStreamId = conn_->streamManager->popTx();
}
}
void QuicTransportBaseLite::setIdleTimer() {
if (closeState_ == CloseState::CLOSED) {
return;
}
cancelTimeout(&idleTimeout_);
cancelTimeout(&keepaliveTimeout_);
auto localIdleTimeout = conn_->transportSettings.idleTimeout;
// The local idle timeout being zero means it is disabled.
if (localIdleTimeout == 0ms) {
return;
}
auto peerIdleTimeout =
conn_->peerIdleTimeout > 0ms ? conn_->peerIdleTimeout : localIdleTimeout;
auto idleTimeout = timeMin(localIdleTimeout, peerIdleTimeout);
scheduleTimeout(&idleTimeout_, idleTimeout);
auto idleTimeoutCount = idleTimeout.count();
if (conn_->transportSettings.enableKeepalive) {
std::chrono::milliseconds keepaliveTimeout = std::chrono::milliseconds(
idleTimeoutCount - static_cast<int64_t>(idleTimeoutCount * .15));
scheduleTimeout(&keepaliveTimeout_, keepaliveTimeout);
}
}
void QuicTransportBaseLite::setTransportSettings(
TransportSettings transportSettings) {
if (conn_->nodeType == QuicNodeType::Client) {
if (useSinglePacketInplaceBatchWriter(
transportSettings.maxBatchSize, transportSettings.dataPathType)) {
createBufAccessor(conn_->udpSendPacketLen);
} else if (
transportSettings.dataPathType ==
quic::DataPathType::ContinuousMemory) {
// Create generic buf for in-place batch writer.
createBufAccessor(
conn_->udpSendPacketLen * transportSettings.maxBatchSize);
}
}
// If transport parameters are encoded, we can only update congestion
// control related params. Setting other transport settings again would be
// buggy.
// TODO should we throw or return Expected here?
if (conn_->transportParametersEncoded) {
updateCongestionControlSettings(transportSettings);
} else {
// TODO: We should let chain based GSO to use bufAccessor in the future as
// well.
CHECK(
conn_->bufAccessor ||
transportSettings.dataPathType != DataPathType::ContinuousMemory);
conn_->transportSettings = std::move(transportSettings);
auto result = conn_->streamManager->refreshTransportSettings(
conn_->transportSettings);
LOG_IF(FATAL, result.hasError()) << result.error().message;
}
// A few values cannot be overridden to be lower than default:
// TODO refactor transport settings to avoid having to update params twice.
if (conn_->transportSettings.defaultCongestionController !=
CongestionControlType::None) {
conn_->transportSettings.initCwndInMss =
std::max(conn_->transportSettings.initCwndInMss, kInitCwndInMss);
conn_->transportSettings.minCwndInMss =
std::max(conn_->transportSettings.minCwndInMss, kMinCwndInMss);
conn_->transportSettings.initCwndInMss = std::max(
conn_->transportSettings.minCwndInMss,
conn_->transportSettings.initCwndInMss);
}
validateCongestionAndPacing(
conn_->transportSettings.defaultCongestionController);
if (conn_->transportSettings.pacingEnabled) {
writeLooper_->setPacingFunction([this]() -> auto {
if (isConnectionPaced(*conn_)) {
return conn_->pacer->getTimeUntilNextWrite();
}
return 0us;
});
bool usingBbr =
(conn_->transportSettings.defaultCongestionController ==
CongestionControlType::BBR ||
conn_->transportSettings.defaultCongestionController ==
CongestionControlType::BBRTesting ||
conn_->transportSettings.defaultCongestionController ==
CongestionControlType::BBR2);
auto minCwnd =
usingBbr ? kMinCwndInMssForBbr : conn_->transportSettings.minCwndInMss;
conn_->pacer = std::make_unique<TokenlessPacer>(*conn_, minCwnd);
conn_->pacer->setExperimental(conn_->transportSettings.experimentalPacer);
conn_->canBePaced = conn_->transportSettings.pacingEnabledFirstFlight;
}
setCongestionControl(conn_->transportSettings.defaultCongestionController);
if (conn_->transportSettings.datagramConfig.enabled) {
conn_->datagramState.maxReadFrameSize = kMaxDatagramFrameSize;
conn_->datagramState.maxReadBufferSize =
conn_->transportSettings.datagramConfig.readBufSize;
conn_->datagramState.maxWriteBufferSize =
conn_->transportSettings.datagramConfig.writeBufSize;
}
updateSocketTosSettings(conn_->transportSettings.dscpValue);
if (conn_->readCodec) {
// Update the codec parameters. In case of the client, the codec was
// initialized in the constructor and did not have the transport settings.
conn_->readCodec->setCodecParameters(CodecParameters(
conn_->peerAckDelayExponent,
conn_->originalVersion.value(),
conn_->transportSettings.maybeAckReceiveTimestampsConfigSentToPeer,
conn_->transportSettings.advertisedExtendedAckFeatures));
}
}
void QuicTransportBaseLite::setCongestionControl(CongestionControlType type) {
DCHECK(conn_);
if (!conn_->congestionController ||
type != conn_->congestionController->type()) {
CHECK(conn_->congestionControllerFactory);
validateCongestionAndPacing(type);
conn_->congestionController =
conn_->congestionControllerFactory->makeCongestionController(
*conn_, type);
if (conn_->qLogger) {
std::stringstream s;
s << "CCA set to " << congestionControlTypeToString(type);
conn_->qLogger->addTransportStateUpdate(s.str());
}
}
}
void QuicTransportBaseLite::setSupportedVersions(
const std::vector<QuicVersion>& versions) {
conn_->originalVersion = versions.at(0);
conn_->supportedVersions = versions;
}
void QuicTransportBaseLite::setCongestionControllerFactory(
std::shared_ptr<CongestionControllerFactory> ccFactory) {
CHECK(ccFactory);
CHECK(conn_);
conn_->congestionControllerFactory = ccFactory;
conn_->congestionController.reset();
}
void QuicTransportBaseLite::addPacketProcessor(
std::shared_ptr<PacketProcessor> packetProcessor) {
DCHECK(conn_);
conn_->packetProcessors.push_back(std::move(packetProcessor));
}
folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBaseLite::setKnob(
uint64_t knobSpace,
uint64_t knobId,
Buf knobBlob) {
if (isKnobSupported()) {
sendSimpleFrame(*conn_, KnobFrame(knobSpace, knobId, std::move(knobBlob)));
return folly::unit;
}
LOG(ERROR) << "Cannot set knob. Peer does not support the knob frame";
return folly::makeUnexpected(LocalErrorCode::KNOB_FRAME_UNSUPPORTED);
}
bool QuicTransportBaseLite::isKnobSupported() const {
return conn_->peerAdvertisedKnobFrameSupport;
}
void QuicTransportBaseLite::validateCongestionAndPacing(
CongestionControlType& type) {
// Fallback to Cubic if Pacing isn't enabled with BBR together
if ((type == CongestionControlType::BBR ||
type == CongestionControlType::BBRTesting ||
type == CongestionControlType::BBR2) &&
!conn_->transportSettings.pacingEnabled) {
LOG(ERROR) << "Unpaced BBR isn't supported";
type = CongestionControlType::Cubic;
}
if (type == CongestionControlType::BBR2 ||
type == CongestionControlType::BBRTesting) {
// We need to have the pacer rate be as accurate as possible for BBR2 and
// BBRTesting.
// The current BBR behavior is dependent on the existing pacing
// behavior so the override is only for BBR2.
// TODO: This should be removed once the pacer changes are adopted as
// the defaults or the pacer is fixed in another way.
conn_->transportSettings.experimentalPacer = true;
conn_->transportSettings.defaultRttFactor = {1, 1};
conn_->transportSettings.startupRttFactor = {1, 1};
if (conn_->pacer) {
conn_->pacer->setExperimental(conn_->transportSettings.experimentalPacer);
conn_->pacer->setRttFactor(
conn_->transportSettings.defaultRttFactor.first,
conn_->transportSettings.defaultRttFactor.second);
}
}
}
void QuicTransportBaseLite::updateSocketTosSettings(uint8_t dscpValue) {
const auto initialTosValue = conn_->socketTos.value;
conn_->socketTos.fields.dscp = dscpValue;
if (conn_->transportSettings.enableEcnOnEgress) {
if (conn_->transportSettings.useL4sEcn) {
conn_->socketTos.fields.ecn = kEcnECT1;
conn_->ecnState = ECNState::AttemptingL4S;
} else {
conn_->socketTos.fields.ecn = kEcnECT0;
conn_->ecnState = ECNState::AttemptingECN;
}
} else {
conn_->socketTos.fields.ecn = 0;
conn_->ecnState = ECNState::NotAttempted;
}
if (socket_ && socket_->isBound() &&
conn_->socketTos.value != initialTosValue) {
socket_->setTosOrTrafficClass(conn_->socketTos.value);
}
}
void QuicTransportBaseLite::validateECNState() {
if (conn_->ecnState == ECNState::NotAttempted ||
conn_->ecnState == ECNState::FailedValidation) {
// Verification not needed
return;
}
const auto& minExpectedMarkedPacketsCount =
conn_->ackStates.appDataAckState.minimumExpectedEcnMarksEchoed;
if (minExpectedMarkedPacketsCount < 10) {
// We wait for 10 ack-eliciting app data packets to be marked before trying
// to validate ECN.
return;
}
const auto& maxExpectedMarkedPacketsCount = conn_->lossState.totalPacketsSent;
auto markedPacketCount = conn_->ackStates.appDataAckState.ecnCECountEchoed;
if (conn_->ecnState == ECNState::AttemptingECN ||
conn_->ecnState == ECNState::ValidatedECN) {
// Check the number of marks seen (ECT0 + CE). ECT1 should be zero.
markedPacketCount += conn_->ackStates.appDataAckState.ecnECT0CountEchoed;
if (markedPacketCount >= minExpectedMarkedPacketsCount &&
markedPacketCount <= maxExpectedMarkedPacketsCount &&
conn_->ackStates.appDataAckState.ecnECT1CountEchoed == 0) {
if (conn_->ecnState != ECNState::ValidatedECN) {
conn_->ecnState = ECNState::ValidatedECN;
VLOG(4) << fmt::format(
"ECN validation successful. Marked {} of {} expected",
markedPacketCount,
minExpectedMarkedPacketsCount);
}
} else {
conn_->ecnState = ECNState::FailedValidation;
VLOG(4) << fmt::format(
"ECN validation failed. Marked {} of {} expected",
markedPacketCount,
minExpectedMarkedPacketsCount);
}
} else if (
conn_->ecnState == ECNState::AttemptingL4S ||
conn_->ecnState == ECNState::ValidatedL4S) {
// Check the number of marks seen (ECT1 + CE). ECT0 should be zero.
markedPacketCount += conn_->ackStates.appDataAckState.ecnECT1CountEchoed;
if (markedPacketCount >= minExpectedMarkedPacketsCount &&
markedPacketCount <= maxExpectedMarkedPacketsCount &&
conn_->ackStates.appDataAckState.ecnECT0CountEchoed == 0) {
if (conn_->ecnState != ECNState::ValidatedL4S) {
if (!conn_->ecnL4sTracker) {
conn_->ecnL4sTracker = std::make_shared<EcnL4sTracker>(*conn_);
addPacketProcessor(conn_->ecnL4sTracker);
}
conn_->ecnState = ECNState::ValidatedL4S;
VLOG(4) << fmt::format(
"L4S validation successful. Marked {} of {} expected",
markedPacketCount,
minExpectedMarkedPacketsCount);
}
} else {
conn_->ecnState = ECNState::FailedValidation;
VLOG(4) << fmt::format(
"L4S validation failed. Marked {} of {} expected",
markedPacketCount,
minExpectedMarkedPacketsCount);
}
}
if (conn_->ecnState == ECNState::FailedValidation) {
conn_->socketTos.fields.ecn = 0;
CHECK(socket_ && socket_->isBound());
socket_->setTosOrTrafficClass(conn_->socketTos.value);
VLOG(4) << "ECN validation failed. Disabling ECN";
if (conn_->ecnL4sTracker) {
conn_->packetProcessors.erase(
std::remove(
conn_->packetProcessors.begin(),
conn_->packetProcessors.end(),
conn_->ecnL4sTracker),
conn_->packetProcessors.end());
conn_->ecnL4sTracker.reset();
}
}
}
void QuicTransportBaseLite::scheduleAckTimeout() {
if (closeState_ == CloseState::CLOSED) {
return;
}
if (conn_->pendingEvents.scheduleAckTimeout) {
if (!isTimeoutScheduled(&ackTimeout_)) {
auto factoredRtt = std::chrono::duration_cast<std::chrono::microseconds>(
conn_->transportSettings.ackTimerFactor * conn_->lossState.srtt);
// If we are using ACK_FREQUENCY, disable the factored RTT heuristic
// and only use the update max ACK delay.
if (conn_->ackStates.appDataAckState.ackFrequencySequenceNumber) {
factoredRtt = conn_->ackStates.maxAckDelay;
}
auto timeout = timeMax(
std::chrono::duration_cast<std::chrono::microseconds>(
evb_->getTimerTickInterval()),
timeMin(conn_->ackStates.maxAckDelay, factoredRtt));
auto timeoutMs = folly::chrono::ceil<std::chrono::milliseconds>(timeout);
VLOG(10) << __func__ << " timeout=" << timeoutMs.count() << "ms"
<< " factoredRtt=" << factoredRtt.count() << "us" << " "
<< *this;
scheduleTimeout(&ackTimeout_, timeoutMs);
}
} else {
if (isTimeoutScheduled(&ackTimeout_)) {
VLOG(10) << __func__ << " cancel timeout " << *this;
cancelTimeout(&ackTimeout_);
}
}
}
void QuicTransportBaseLite::schedulePathValidationTimeout() {
if (closeState_ == CloseState::CLOSED) {
return;
}
if (!conn_->pendingEvents.schedulePathValidationTimeout) {
if (isTimeoutScheduled(&pathValidationTimeout_)) {
VLOG(10) << __func__ << " cancel timeout " << *this;
// This means path validation succeeded, and we should have updated to
// correct state
cancelTimeout(&pathValidationTimeout_);
}
} else if (!isTimeoutScheduled(&pathValidationTimeout_)) {
auto pto = conn_->lossState.srtt +
std::max(4 * conn_->lossState.rttvar, kGranularity) +
conn_->lossState.maxAckDelay;
auto validationTimeout =
std::max(3 * pto, 6 * conn_->transportSettings.initialRtt);
auto timeoutMs =
folly::chrono::ceil<std::chrono::milliseconds>(validationTimeout);
VLOG(10) << __func__ << " timeout=" << timeoutMs.count() << "ms " << *this;
scheduleTimeout(&pathValidationTimeout_, timeoutMs);
}
}
/**
* Getters for details from the transport/security layers such as
* RTT, rxmit, cwnd, mss, app protocol, handshake latency,
* client proposed ciphers, etc.
*/
QuicSocketLite::TransportInfo QuicTransportBaseLite::getTransportInfo() const {
CongestionControlType congestionControlType = CongestionControlType::None;
uint64_t writableBytes = std::numeric_limits<uint64_t>::max();
uint64_t congestionWindow = std::numeric_limits<uint64_t>::max();
Optional<CongestionController::State> maybeCCState;
uint64_t burstSize = 0;
std::chrono::microseconds pacingInterval = 0ms;
if (conn_->congestionController) {
congestionControlType = conn_->congestionController->type();
writableBytes = conn_->congestionController->getWritableBytes();
congestionWindow = conn_->congestionController->getCongestionWindow();
maybeCCState = conn_->congestionController->getState();
if (isConnectionPaced(*conn_)) {
burstSize = conn_->pacer->getCachedWriteBatchSize();
pacingInterval = conn_->pacer->getTimeUntilNextWrite();
}
}
TransportInfo transportInfo;
transportInfo.connectionTime = conn_->connectionTime;
transportInfo.srtt = conn_->lossState.srtt;
transportInfo.rttvar = conn_->lossState.rttvar;
transportInfo.lrtt = conn_->lossState.lrtt;
transportInfo.maybeLrtt = conn_->lossState.maybeLrtt;
transportInfo.maybeLrttAckDelay = conn_->lossState.maybeLrttAckDelay;
if (conn_->lossState.mrtt != kDefaultMinRtt) {
transportInfo.maybeMinRtt = conn_->lossState.mrtt;
}
transportInfo.maybeMinRttNoAckDelay = conn_->lossState.maybeMrttNoAckDelay;
transportInfo.mss = conn_->udpSendPacketLen;
transportInfo.congestionControlType = congestionControlType;
transportInfo.writableBytes = writableBytes;
transportInfo.congestionWindow = congestionWindow;
transportInfo.pacingBurstSize = burstSize;
transportInfo.pacingInterval = pacingInterval;
transportInfo.packetsRetransmitted = conn_->lossState.rtxCount;
transportInfo.totalPacketsSent = conn_->lossState.totalPacketsSent;
transportInfo.totalAckElicitingPacketsSent =
conn_->lossState.totalAckElicitingPacketsSent;
transportInfo.totalPacketsMarkedLost =
conn_->lossState.totalPacketsMarkedLost;
transportInfo.totalPacketsMarkedLostByTimeout =
conn_->lossState.totalPacketsMarkedLostByTimeout;
transportInfo.totalPacketsMarkedLostByReorderingThreshold =
conn_->lossState.totalPacketsMarkedLostByReorderingThreshold;
transportInfo.totalPacketsSpuriouslyMarkedLost =
conn_->lossState.totalPacketsSpuriouslyMarkedLost;
transportInfo.timeoutBasedLoss = conn_->lossState.timeoutBasedRtxCount;
transportInfo.totalBytesRetransmitted =
conn_->lossState.totalBytesRetransmitted;
transportInfo.pto = calculatePTO(*conn_);
transportInfo.bytesSent = conn_->lossState.totalBytesSent;
transportInfo.bytesAcked = conn_->lossState.totalBytesAcked;
transportInfo.bytesRecvd = conn_->lossState.totalBytesRecvd;
transportInfo.bytesInFlight = conn_->lossState.inflightBytes;
transportInfo.bodyBytesSent = conn_->lossState.totalBodyBytesSent;
transportInfo.bodyBytesAcked = conn_->lossState.totalBodyBytesAcked;
transportInfo.totalStreamBytesSent = conn_->lossState.totalStreamBytesSent;
transportInfo.totalNewStreamBytesSent =
conn_->lossState.totalNewStreamBytesSent;
transportInfo.ptoCount = conn_->lossState.ptoCount;
transportInfo.totalPTOCount = conn_->lossState.totalPTOCount;
transportInfo.largestPacketAckedByPeer =
conn_->ackStates.appDataAckState.largestAckedByPeer;
transportInfo.largestPacketSent = conn_->lossState.largestSent;
transportInfo.usedZeroRtt = conn_->usedZeroRtt;
transportInfo.maybeCCState = maybeCCState;
return transportInfo;
}
const folly::SocketAddress& QuicTransportBaseLite::getLocalAddress() const {
return socket_ && socket_->isBound() ? socket_->address()
: localFallbackAddress;
}
void QuicTransportBaseLite::handleNewStreams(
std::vector<StreamId>& streamStorage) {
const auto& newPeerStreamIds = streamStorage;
for (const auto& streamId : newPeerStreamIds) {
CHECK_NOTNULL(connCallback_.get());
if (isBidirectionalStream(streamId)) {
connCallback_->onNewBidirectionalStream(streamId);
} else {
connCallback_->onNewUnidirectionalStream(streamId);
}
logStreamOpenEvent(streamId);
if (closeState_ != CloseState::OPEN) {
return;
}
}
streamStorage.clear();
}
void QuicTransportBaseLite::handleNewGroupedStreams(
std::vector<StreamId>& streamStorage) {
const auto& newPeerStreamIds = streamStorage;
for (const auto& streamId : newPeerStreamIds) {
CHECK_NOTNULL(connCallback_.get());
auto stream = CHECK_NOTNULL(
conn_->streamManager->getStream(streamId).value_or(nullptr));
CHECK(stream->groupId);
if (isBidirectionalStream(streamId)) {
connCallback_->onNewBidirectionalStreamInGroup(
streamId, *stream->groupId);
} else {
connCallback_->onNewUnidirectionalStreamInGroup(
streamId, *stream->groupId);
}
logStreamOpenEvent(streamId);
if (closeState_ != CloseState::OPEN) {
return;
}
}
streamStorage.clear();
}
void QuicTransportBaseLite::logStreamOpenEvent(StreamId streamId) {
if (getSocketObserverContainer() &&
getSocketObserverContainer()
->hasObserversForEvent<
SocketObserverInterface::Events::streamEvents>()) {
getSocketObserverContainer()
->invokeInterfaceMethod<SocketObserverInterface::Events::streamEvents>(
[event = SocketObserverInterface::StreamOpenEvent(
streamId,
getStreamInitiator(streamId),
getStreamDirectionality(streamId))](
auto observer, auto observed) {
observer->streamOpened(observed, event);
});
}
}
bool QuicTransportBaseLite::hasDeliveryCallbacksToCall(
StreamId streamId,
uint64_t maxOffsetToDeliver) const {
auto callbacksIt = deliveryCallbacks_.find(streamId);
if (callbacksIt == deliveryCallbacks_.end() || callbacksIt->second.empty()) {
return false;
}
return (callbacksIt->second.front().offset <= maxOffsetToDeliver);
}
void QuicTransportBaseLite::updatePacketProcessorsPrewriteRequests() {
folly::SocketCmsgMap cmsgs;
for (const auto& pp : conn_->packetProcessors) {
// In case of overlapping cmsg keys, the priority is given to
// that were added to the QuicSocket first.
auto writeRequest = pp->prewrite();
if (writeRequest && writeRequest->cmsgs) {
cmsgs.insert(writeRequest->cmsgs->begin(), writeRequest->cmsgs->end());
}
}
if (!cmsgs.empty()) {
conn_->socketCmsgsState.additionalCmsgs = cmsgs;
} else {
conn_->socketCmsgsState.additionalCmsgs.reset();
}
conn_->socketCmsgsState.targetWriteCount = conn_->writeCount;
}
void QuicTransportBaseLite::updateCongestionControlSettings(
const TransportSettings& transportSettings) {
conn_->transportSettings.defaultCongestionController =
transportSettings.defaultCongestionController;
conn_->transportSettings.initCwndInMss = transportSettings.initCwndInMss;
conn_->transportSettings.minCwndInMss = transportSettings.minCwndInMss;
conn_->transportSettings.maxCwndInMss = transportSettings.maxCwndInMss;
conn_->transportSettings.limitedCwndInMss =
transportSettings.limitedCwndInMss;
conn_->transportSettings.pacingEnabled = transportSettings.pacingEnabled;
conn_->transportSettings.pacingTickInterval =
transportSettings.pacingTickInterval;
conn_->transportSettings.pacingTimerResolution =
transportSettings.pacingTimerResolution;
conn_->transportSettings.minBurstPackets = transportSettings.minBurstPackets;
conn_->transportSettings.copaDeltaParam = transportSettings.copaDeltaParam;
conn_->transportSettings.copaUseRttStanding =
transportSettings.copaUseRttStanding;
}
void QuicTransportBaseLite::describe(std::ostream& os) const {
CHECK(conn_);
os << *conn_;
}
std::ostream& operator<<(std::ostream& os, const QuicTransportBaseLite& qt) {
qt.describe(os);
return os;
}
} // namespace quic