mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-04-18 17:24:03 +03:00
Summary: This introduces a more generic typealias so that we can, for instance, write `BufHelpers::createCombined` instead of `folly::IOBuf::createCombined`. Reviewed By: jbeshay Differential Revision: D73127508 fbshipit-source-id: d585790904efc8e9f92d79cbf766bafe0e84a69f
2284 lines
83 KiB
C++
2284 lines
83 KiB
C++
/*
|
|
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* LICENSE file in the root directory of this source tree.
|
|
*/
|
|
|
|
#include <folly/tracing/StaticTracepoint.h>
|
|
#include <quic/QuicConstants.h>
|
|
#include <quic/QuicException.h>
|
|
#include <quic/api/QuicBatchWriterFactory.h>
|
|
#include <quic/api/QuicTransportFunctions.h>
|
|
#include <quic/client/state/ClientStateMachine.h>
|
|
#include <quic/codec/QuicPacketBuilder.h>
|
|
#include <quic/codec/QuicWriteCodec.h>
|
|
#include <quic/codec/Types.h>
|
|
#include <quic/common/BufAccessor.h>
|
|
#include <quic/flowcontrol/QuicFlowController.h>
|
|
#include <quic/happyeyeballs/QuicHappyEyeballsFunctions.h>
|
|
|
|
#include <quic/state/AckHandlers.h>
|
|
#include <quic/state/QuicAckFrequencyFunctions.h>
|
|
#include <quic/state/QuicStateFunctions.h>
|
|
#include <quic/state/QuicStreamFunctions.h>
|
|
#include <quic/state/SimpleFrameFunctions.h>
|
|
|
|
namespace {
|
|
|
|
/*
|
|
* Check whether crypto has pending data.
|
|
*/
|
|
bool cryptoHasWritableData(const quic::QuicConnectionStateBase& conn) {
|
|
return (conn.initialWriteCipher &&
|
|
(!conn.cryptoState->initialStream.pendingWrites.empty() ||
|
|
!conn.cryptoState->initialStream.lossBuffer.empty())) ||
|
|
(conn.handshakeWriteCipher &&
|
|
(!conn.cryptoState->handshakeStream.pendingWrites.empty() ||
|
|
!conn.cryptoState->handshakeStream.lossBuffer.empty())) ||
|
|
(conn.oneRttWriteCipher &&
|
|
(!conn.cryptoState->oneRttStream.pendingWrites.empty() ||
|
|
!conn.cryptoState->oneRttStream.lossBuffer.empty()));
|
|
}
|
|
|
|
std::string optionalToString(const quic::Optional<quic::PacketNum>& packetNum) {
|
|
if (!packetNum) {
|
|
return "-";
|
|
}
|
|
return folly::to<std::string>(*packetNum);
|
|
}
|
|
|
|
std::string largestAckScheduledToString(
|
|
const quic::QuicConnectionStateBase& conn) noexcept {
|
|
return folly::to<std::string>(
|
|
"[",
|
|
optionalToString(
|
|
conn.ackStates.initialAckState
|
|
? conn.ackStates.initialAckState->largestAckScheduled
|
|
: quic::none),
|
|
",",
|
|
optionalToString(
|
|
conn.ackStates.handshakeAckState
|
|
? conn.ackStates.handshakeAckState->largestAckScheduled
|
|
: quic::none),
|
|
",",
|
|
optionalToString(conn.ackStates.appDataAckState.largestAckScheduled),
|
|
"]");
|
|
}
|
|
|
|
std::string largestAckToSendToString(
|
|
const quic::QuicConnectionStateBase& conn) noexcept {
|
|
return folly::to<std::string>(
|
|
"[",
|
|
optionalToString(
|
|
conn.ackStates.initialAckState
|
|
? largestAckToSend(*conn.ackStates.initialAckState)
|
|
: quic::none),
|
|
",",
|
|
optionalToString(
|
|
conn.ackStates.handshakeAckState
|
|
? largestAckToSend(*conn.ackStates.handshakeAckState)
|
|
: quic::none),
|
|
",",
|
|
optionalToString(largestAckToSend(conn.ackStates.appDataAckState)),
|
|
"]");
|
|
}
|
|
|
|
using namespace quic;
|
|
|
|
/**
|
|
* This function returns the number of write bytes that are available until we
|
|
* reach the writableBytesLimit. It may or may not be the limiting factor on the
|
|
* number of bytes we can write on the wire.
|
|
*
|
|
* If the client's address has not been verified, this will return the number of
|
|
* write bytes available until writableBytesLimit is reached.
|
|
*
|
|
* Otherwise if the client's address is validated, it will return unlimited
|
|
* number of bytes to write.
|
|
*/
|
|
uint64_t maybeUnvalidatedClientWritableBytes(
|
|
quic::QuicConnectionStateBase& conn) {
|
|
if (!conn.writableBytesLimit) {
|
|
return unlimitedWritableBytes(conn);
|
|
}
|
|
|
|
if (*conn.writableBytesLimit <= conn.lossState.totalBytesSent) {
|
|
QUIC_STATS(conn.statsCallback, onConnectionWritableBytesLimited);
|
|
return 0;
|
|
}
|
|
|
|
uint64_t writableBytes =
|
|
*conn.writableBytesLimit - conn.lossState.totalBytesSent;
|
|
|
|
// round the result up to the nearest multiple of udpSendPacketLen.
|
|
return (writableBytes + conn.udpSendPacketLen - 1) / conn.udpSendPacketLen *
|
|
conn.udpSendPacketLen;
|
|
}
|
|
|
|
folly::Expected<WriteQuicDataResult, QuicError> writeQuicDataToSocketImpl(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
uint64_t packetLimit,
|
|
bool exceptCryptoStream,
|
|
TimePoint writeLoopBeginTime) {
|
|
auto builder = ShortHeaderBuilder(connection.oneRttWritePhase);
|
|
WriteQuicDataResult result;
|
|
auto& packetsWritten = result.packetsWritten;
|
|
auto& probesWritten = result.probesWritten;
|
|
auto& bytesWritten = result.bytesWritten;
|
|
auto& numProbePackets =
|
|
connection.pendingEvents.numProbePackets[PacketNumberSpace::AppData];
|
|
if (numProbePackets) {
|
|
auto probeSchedulerBuilder =
|
|
FrameScheduler::Builder(
|
|
connection,
|
|
EncryptionLevel::AppData,
|
|
PacketNumberSpace::AppData,
|
|
exceptCryptoStream ? "ProbeWithoutCrypto" : "ProbeScheduler")
|
|
.blockedFrames()
|
|
.windowUpdateFrames()
|
|
.simpleFrames()
|
|
.resetFrames()
|
|
.streamFrames()
|
|
.pingFrames()
|
|
.immediateAckFrames();
|
|
if (!exceptCryptoStream) {
|
|
probeSchedulerBuilder.cryptoFrames();
|
|
}
|
|
auto probeScheduler = std::move(probeSchedulerBuilder).build();
|
|
auto probeResult = writeProbingDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
builder,
|
|
EncryptionLevel::AppData,
|
|
PacketNumberSpace::AppData,
|
|
probeScheduler,
|
|
numProbePackets, // This possibly bypasses the packetLimit.
|
|
aead,
|
|
headerCipher,
|
|
version);
|
|
if (!probeResult.hasValue()) {
|
|
return folly::makeUnexpected(probeResult.error());
|
|
}
|
|
probesWritten = probeResult->probesWritten;
|
|
bytesWritten += probeResult->bytesWritten;
|
|
// We only get one chance to write out the probes.
|
|
numProbePackets = 0;
|
|
packetLimit =
|
|
probesWritten > packetLimit ? 0 : (packetLimit - probesWritten);
|
|
}
|
|
auto schedulerBuilder =
|
|
FrameScheduler::Builder(
|
|
connection,
|
|
EncryptionLevel::AppData,
|
|
PacketNumberSpace::AppData,
|
|
exceptCryptoStream ? "FrameSchedulerWithoutCrypto" : "FrameScheduler")
|
|
.streamFrames()
|
|
.resetFrames()
|
|
.windowUpdateFrames()
|
|
.blockedFrames()
|
|
.simpleFrames()
|
|
.pingFrames()
|
|
.datagramFrames()
|
|
.immediateAckFrames();
|
|
// Only add ACK frames if we need to send an ACK.
|
|
if (connection.transportSettings.opportunisticAcking ||
|
|
toWriteAppDataAcks(connection)) {
|
|
schedulerBuilder.ackFrames();
|
|
}
|
|
if (!exceptCryptoStream) {
|
|
schedulerBuilder.cryptoFrames();
|
|
}
|
|
FrameScheduler scheduler = std::move(schedulerBuilder).build();
|
|
auto connectionDataResult = writeConnectionDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
std::move(builder),
|
|
PacketNumberSpace::AppData,
|
|
scheduler,
|
|
congestionControlWritableBytes,
|
|
packetLimit,
|
|
aead,
|
|
headerCipher,
|
|
version,
|
|
writeLoopBeginTime);
|
|
if (!connectionDataResult.hasValue()) {
|
|
return folly::makeUnexpected(connectionDataResult.error());
|
|
}
|
|
packetsWritten += connectionDataResult->packetsWritten;
|
|
bytesWritten += connectionDataResult->bytesWritten;
|
|
VLOG_IF(10, packetsWritten || probesWritten)
|
|
<< nodeToString(connection.nodeType) << " written data "
|
|
<< (exceptCryptoStream ? "without crypto data " : "")
|
|
<< "to socket packets=" << packetsWritten << " probes=" << probesWritten
|
|
<< " " << connection;
|
|
return result;
|
|
}
|
|
|
|
void updateErrnoCount(
|
|
QuicConnectionStateBase& connection,
|
|
IOBufQuicBatch& ioBufBatch) {
|
|
int lastErrno = ioBufBatch.getLastRetryableErrno();
|
|
if (lastErrno == EAGAIN || lastErrno == EWOULDBLOCK) {
|
|
connection.eagainOrEwouldblockCount++;
|
|
} else if (lastErrno == ENOBUFS) {
|
|
connection.enobufsCount++;
|
|
}
|
|
}
|
|
|
|
[[nodiscard]] folly::Expected<DataPathResult, QuicError>
|
|
continuousMemoryBuildScheduleEncrypt(
|
|
QuicConnectionStateBase& connection,
|
|
PacketHeader header,
|
|
PacketNumberSpace pnSpace,
|
|
PacketNum packetNum,
|
|
uint64_t cipherOverhead,
|
|
QuicPacketScheduler& scheduler,
|
|
uint64_t writableBytes,
|
|
IOBufQuicBatch& ioBufBatch,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher) {
|
|
auto prevSize = connection.bufAccessor->length();
|
|
|
|
auto rollbackBuf = [&]() {
|
|
connection.bufAccessor->trimEnd(
|
|
connection.bufAccessor->length() - prevSize);
|
|
};
|
|
|
|
// It's the scheduler's job to invoke encode header
|
|
InplaceQuicPacketBuilder pktBuilder(
|
|
*connection.bufAccessor,
|
|
connection.udpSendPacketLen,
|
|
std::move(header),
|
|
getAckState(connection, pnSpace).largestAckedByPeer.value_or(0));
|
|
pktBuilder.accountForCipherOverhead(cipherOverhead);
|
|
CHECK(scheduler.hasData());
|
|
auto result =
|
|
scheduler.scheduleFramesForPacket(std::move(pktBuilder), writableBytes);
|
|
if (result.hasError()) {
|
|
return folly::makeUnexpected(result.error());
|
|
}
|
|
CHECK(connection.bufAccessor->ownsBuffer());
|
|
auto& packet = result->packet;
|
|
if (!packet || packet->packet.frames.empty()) {
|
|
rollbackBuf();
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME;
|
|
}
|
|
return DataPathResult::makeBuildFailure();
|
|
}
|
|
if (packet->body.empty()) {
|
|
// No more space remaining.
|
|
rollbackBuf();
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY;
|
|
}
|
|
return DataPathResult::makeBuildFailure();
|
|
}
|
|
CHECK(!packet->header.isChained());
|
|
auto headerLen = packet->header.length();
|
|
CHECK(
|
|
packet->body.data() > connection.bufAccessor->data() &&
|
|
packet->body.tail() <= connection.bufAccessor->tail());
|
|
CHECK(
|
|
packet->header.data() >= connection.bufAccessor->data() &&
|
|
packet->header.tail() < connection.bufAccessor->tail());
|
|
// Trim off everything before the current packet, and the header length, so
|
|
// buf's data starts from the body part of buf.
|
|
connection.bufAccessor->trimStart(prevSize + headerLen);
|
|
// buf and packetBuf is actually the same.
|
|
auto buf = connection.bufAccessor->obtain();
|
|
auto packetBuf =
|
|
aead.inplaceEncrypt(std::move(buf), &packet->header, packetNum);
|
|
CHECK(packetBuf->headroom() == headerLen + prevSize);
|
|
// Include header back.
|
|
packetBuf->prepend(headerLen);
|
|
|
|
HeaderForm headerForm = packet->packet.header.getHeaderForm();
|
|
encryptPacketHeader(
|
|
headerForm,
|
|
packetBuf->writableData(),
|
|
headerLen,
|
|
packetBuf->data() + headerLen,
|
|
packetBuf->length() - headerLen,
|
|
headerCipher);
|
|
CHECK(!packetBuf->isChained());
|
|
auto encodedSize = packetBuf->length();
|
|
auto encodedBodySize = encodedSize - headerLen;
|
|
// Include previous packets back.
|
|
packetBuf->prepend(prevSize);
|
|
connection.bufAccessor->release(std::move(packetBuf));
|
|
if (encodedSize > connection.udpSendPacketLen) {
|
|
VLOG(3) << "Quic sending pkt larger than limit, encodedSize="
|
|
<< encodedSize;
|
|
}
|
|
// TODO: I think we should add an API that doesn't need a buffer.
|
|
bool ret = ioBufBatch.write(nullptr /* no need to pass buf */, encodedSize);
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
return DataPathResult::makeWriteResult(
|
|
ret, std::move(result.value()), encodedSize, encodedBodySize);
|
|
}
|
|
|
|
[[nodiscard]] folly::Expected<DataPathResult, QuicError>
|
|
iobufChainBasedBuildScheduleEncrypt(
|
|
QuicConnectionStateBase& connection,
|
|
PacketHeader header,
|
|
PacketNumberSpace pnSpace,
|
|
PacketNum packetNum,
|
|
uint64_t cipherOverhead,
|
|
QuicPacketScheduler& scheduler,
|
|
uint64_t writableBytes,
|
|
IOBufQuicBatch& ioBufBatch,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher) {
|
|
RegularQuicPacketBuilder pktBuilder(
|
|
connection.udpSendPacketLen,
|
|
std::move(header),
|
|
getAckState(connection, pnSpace).largestAckedByPeer.value_or(0));
|
|
// It's the scheduler's job to invoke encode header
|
|
pktBuilder.accountForCipherOverhead(cipherOverhead);
|
|
auto result =
|
|
scheduler.scheduleFramesForPacket(std::move(pktBuilder), writableBytes);
|
|
if (result.hasError()) {
|
|
return folly::makeUnexpected(result.error());
|
|
}
|
|
auto& packet = result->packet;
|
|
if (!packet || packet->packet.frames.empty()) {
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME;
|
|
}
|
|
return DataPathResult::makeBuildFailure();
|
|
}
|
|
if (packet->body.empty()) {
|
|
// No more space remaining.
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY;
|
|
}
|
|
return DataPathResult::makeBuildFailure();
|
|
}
|
|
packet->header.coalesce();
|
|
auto headerLen = packet->header.length();
|
|
auto bodyLen = packet->body.computeChainDataLength();
|
|
auto unencrypted = BufHelpers::createCombined(
|
|
headerLen + bodyLen + aead.getCipherOverhead());
|
|
auto bodyCursor = folly::io::Cursor(&packet->body);
|
|
bodyCursor.pull(unencrypted->writableData() + headerLen, bodyLen);
|
|
unencrypted->advance(headerLen);
|
|
unencrypted->append(bodyLen);
|
|
auto packetBuf =
|
|
aead.inplaceEncrypt(std::move(unencrypted), &packet->header, packetNum);
|
|
DCHECK(packetBuf->headroom() == headerLen);
|
|
packetBuf->clear();
|
|
auto headerCursor = folly::io::Cursor(&packet->header);
|
|
headerCursor.pull(packetBuf->writableData(), headerLen);
|
|
packetBuf->append(headerLen + bodyLen + aead.getCipherOverhead());
|
|
|
|
HeaderForm headerForm = packet->packet.header.getHeaderForm();
|
|
encryptPacketHeader(
|
|
headerForm,
|
|
packetBuf->writableData(),
|
|
headerLen,
|
|
packetBuf->data() + headerLen,
|
|
packetBuf->length() - headerLen,
|
|
headerCipher);
|
|
auto encodedSize = packetBuf->computeChainDataLength();
|
|
auto encodedBodySize = encodedSize - headerLen;
|
|
if (encodedSize > connection.udpSendPacketLen) {
|
|
VLOG(3) << "Quic sending pkt larger than limit, encodedSize=" << encodedSize
|
|
<< " encodedBodySize=" << encodedBodySize;
|
|
}
|
|
bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize);
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
return DataPathResult::makeWriteResult(
|
|
ret, std::move(result.value()), encodedSize, encodedBodySize);
|
|
}
|
|
|
|
} // namespace
|
|
|
|
namespace quic {
|
|
|
|
void handleNewStreamBufMetaWritten(
|
|
QuicStreamState& stream,
|
|
uint64_t frameLen,
|
|
bool frameFin);
|
|
|
|
void handleRetransmissionBufMetaWritten(
|
|
QuicStreamState& stream,
|
|
uint64_t frameOffset,
|
|
uint64_t frameLen,
|
|
bool frameFin,
|
|
const decltype(stream.lossBufMetas)::iterator lossBufMetaIter);
|
|
|
|
bool writeLoopTimeLimit(
|
|
TimePoint loopBeginTime,
|
|
const QuicConnectionStateBase& connection) {
|
|
return connection.lossState.srtt == 0us ||
|
|
connection.transportSettings.writeLimitRttFraction == 0 ||
|
|
Clock::now() - loopBeginTime < connection.lossState.srtt /
|
|
connection.transportSettings.writeLimitRttFraction;
|
|
}
|
|
|
|
void handleNewStreamDataWritten(
|
|
QuicStreamLike& stream,
|
|
uint64_t frameLen,
|
|
bool frameFin) {
|
|
auto originalOffset = stream.currentWriteOffset;
|
|
// Idealy we should also check this data doesn't exist in either retx buffer
|
|
// or loss buffer, but that's an expensive search.
|
|
stream.currentWriteOffset += frameLen;
|
|
ChainedByteRangeHead bufWritten(
|
|
stream.pendingWrites.splitAtMost(folly::to<size_t>(frameLen)));
|
|
DCHECK_EQ(bufWritten.chainLength(), frameLen);
|
|
// TODO: If we want to be able to write FIN out of order for DSR-ed streams,
|
|
// this needs to be fixed:
|
|
stream.currentWriteOffset += frameFin ? 1 : 0;
|
|
CHECK(stream.retransmissionBuffer
|
|
.emplace(
|
|
std::piecewise_construct,
|
|
std::forward_as_tuple(originalOffset),
|
|
std::forward_as_tuple(std::make_unique<WriteStreamBuffer>(
|
|
std::move(bufWritten), originalOffset, frameFin)))
|
|
.second);
|
|
}
|
|
|
|
void handleNewStreamBufMetaWritten(
|
|
QuicStreamState& stream,
|
|
uint64_t frameLen,
|
|
bool frameFin) {
|
|
CHECK_GT(stream.writeBufMeta.offset, 0);
|
|
auto originalOffset = stream.writeBufMeta.offset;
|
|
auto bufMetaSplit = stream.writeBufMeta.split(frameLen);
|
|
CHECK_EQ(bufMetaSplit.offset, originalOffset);
|
|
if (frameFin) {
|
|
// If FIN is written, nothing should be left in the writeBufMeta.
|
|
CHECK_EQ(0, stream.writeBufMeta.length);
|
|
++stream.writeBufMeta.offset;
|
|
CHECK_GT(stream.writeBufMeta.offset, *stream.finalWriteOffset);
|
|
}
|
|
CHECK(stream.retransmissionBufMetas
|
|
.emplace(
|
|
std::piecewise_construct,
|
|
std::forward_as_tuple(originalOffset),
|
|
std::forward_as_tuple(bufMetaSplit))
|
|
.second);
|
|
}
|
|
|
|
void handleRetransmissionWritten(
|
|
QuicStreamLike& stream,
|
|
uint64_t frameOffset,
|
|
uint64_t frameLen,
|
|
bool frameFin,
|
|
CircularDeque<WriteStreamBuffer>::iterator lossBufferIter) {
|
|
auto bufferLen = lossBufferIter->data.chainLength();
|
|
if (frameLen == bufferLen && frameFin == lossBufferIter->eof) {
|
|
// The buffer is entirely retransmitted
|
|
ChainedByteRangeHead bufWritten(std::move(lossBufferIter->data));
|
|
stream.lossBuffer.erase(lossBufferIter);
|
|
CHECK(stream.retransmissionBuffer
|
|
.emplace(
|
|
std::piecewise_construct,
|
|
std::forward_as_tuple(frameOffset),
|
|
std::forward_as_tuple(std::make_unique<WriteStreamBuffer>(
|
|
std::move(bufWritten), frameOffset, frameFin)))
|
|
.second);
|
|
} else {
|
|
lossBufferIter->offset += frameLen;
|
|
ChainedByteRangeHead bufWritten(lossBufferIter->data.splitAtMost(frameLen));
|
|
CHECK(stream.retransmissionBuffer
|
|
.emplace(
|
|
std::piecewise_construct,
|
|
std::forward_as_tuple(frameOffset),
|
|
std::forward_as_tuple(std::make_unique<WriteStreamBuffer>(
|
|
std::move(bufWritten), frameOffset, frameFin)))
|
|
.second);
|
|
}
|
|
}
|
|
|
|
void handleRetransmissionBufMetaWritten(
|
|
QuicStreamState& stream,
|
|
uint64_t frameOffset,
|
|
uint64_t frameLen,
|
|
bool frameFin,
|
|
const decltype(stream.lossBufMetas)::iterator lossBufMetaIter) {
|
|
if (frameLen == lossBufMetaIter->length && frameFin == lossBufMetaIter->eof) {
|
|
stream.lossBufMetas.erase(lossBufMetaIter);
|
|
} else {
|
|
CHECK_GT(lossBufMetaIter->length, frameLen);
|
|
lossBufMetaIter->length -= frameLen;
|
|
lossBufMetaIter->offset += frameLen;
|
|
}
|
|
CHECK(stream.retransmissionBufMetas
|
|
.emplace(
|
|
std::piecewise_construct,
|
|
std::forward_as_tuple(frameOffset),
|
|
std::forward_as_tuple(WriteBufferMeta::Builder()
|
|
.setOffset(frameOffset)
|
|
.setLength(frameLen)
|
|
.setEOF(frameFin)
|
|
.build()))
|
|
.second);
|
|
}
|
|
|
|
/**
|
|
* Update the connection and stream state after stream data is written and deal
|
|
* with new data, as well as retranmissions. Returns true if the data sent is
|
|
* new data.
|
|
*/
|
|
folly::Expected<bool, QuicError> handleStreamWritten(
|
|
QuicConnectionStateBase& conn,
|
|
QuicStreamLike& stream,
|
|
uint64_t frameOffset,
|
|
uint64_t frameLen,
|
|
bool frameFin,
|
|
PacketNum packetNum,
|
|
PacketNumberSpace packetNumberSpace) {
|
|
auto writtenNewData = false;
|
|
// Handle new data first
|
|
if (frameOffset == stream.currentWriteOffset) {
|
|
handleNewStreamDataWritten(stream, frameLen, frameFin);
|
|
writtenNewData = true;
|
|
} else if (frameOffset > stream.currentWriteOffset) {
|
|
return folly::makeUnexpected(QuicError(
|
|
TransportErrorCode::INTERNAL_ERROR,
|
|
fmt::format(
|
|
"Byte offset of first byte in written stream frame ({}) is "
|
|
"greater than stream's current write offset ({})",
|
|
frameOffset,
|
|
stream.currentWriteOffset)));
|
|
}
|
|
|
|
if (writtenNewData) {
|
|
// Count packet. It's based on the assumption that schedluing scheme will
|
|
// only writes one STREAM frame for a stream in a packet. If that doesn't
|
|
// hold, we need to avoid double-counting.
|
|
++stream.numPacketsTxWithNewData;
|
|
VLOG(10) << nodeToString(conn.nodeType) << " sent"
|
|
<< " packetNum=" << packetNum << " space=" << packetNumberSpace
|
|
<< " " << conn;
|
|
return true;
|
|
}
|
|
|
|
bool writtenRetx = false;
|
|
// If the data is in the loss buffer, it is a retransmission.
|
|
auto lossBufferIter = std::lower_bound(
|
|
stream.lossBuffer.begin(),
|
|
stream.lossBuffer.end(),
|
|
frameOffset,
|
|
[](const auto& buf, auto off) { return buf.offset < off; });
|
|
if (lossBufferIter != stream.lossBuffer.end() &&
|
|
lossBufferIter->offset == frameOffset) {
|
|
handleRetransmissionWritten(
|
|
stream, frameOffset, frameLen, frameFin, lossBufferIter);
|
|
writtenRetx = true;
|
|
}
|
|
|
|
if (writtenRetx) {
|
|
conn.lossState.totalBytesRetransmitted += frameLen;
|
|
VLOG(10) << nodeToString(conn.nodeType) << " sent retransmission"
|
|
<< " packetNum=" << packetNum << " " << conn;
|
|
QUIC_STATS(conn.statsCallback, onPacketRetransmission);
|
|
return false;
|
|
}
|
|
|
|
// Otherwise it must be a clone write.
|
|
conn.lossState.totalStreamBytesCloned += frameLen;
|
|
return false;
|
|
}
|
|
|
|
bool handleStreamBufMetaWritten(
|
|
QuicConnectionStateBase& conn,
|
|
QuicStreamState& stream,
|
|
uint64_t frameOffset,
|
|
uint64_t frameLen,
|
|
bool frameFin,
|
|
PacketNum packetNum,
|
|
PacketNumberSpace packetNumberSpace) {
|
|
auto writtenNewData = false;
|
|
// Handle new data first
|
|
if (stream.writeBufMeta.offset > 0 &&
|
|
frameOffset == stream.writeBufMeta.offset) {
|
|
handleNewStreamBufMetaWritten(stream, frameLen, frameFin);
|
|
writtenNewData = true;
|
|
}
|
|
|
|
if (writtenNewData) {
|
|
// Count packet. It's based on the assumption that schedluing scheme will
|
|
// only writes one STREAM frame for a stream in a packet. If that doesn't
|
|
// hold, we need to avoid double-counting.
|
|
++stream.numPacketsTxWithNewData;
|
|
VLOG(10) << nodeToString(conn.nodeType) << " sent"
|
|
<< " packetNum=" << packetNum << " space=" << packetNumberSpace
|
|
<< " " << conn;
|
|
return true;
|
|
}
|
|
|
|
auto lossBufMetaIter = std::lower_bound(
|
|
stream.lossBufMetas.begin(),
|
|
stream.lossBufMetas.end(),
|
|
frameOffset,
|
|
[](const auto& bufMeta, auto offset) { return bufMeta.offset < offset; });
|
|
// We do not clone BufMeta right now. So the data has to be in lossBufMetas.
|
|
CHECK(lossBufMetaIter != stream.lossBufMetas.end());
|
|
CHECK_EQ(lossBufMetaIter->offset, frameOffset);
|
|
handleRetransmissionBufMetaWritten(
|
|
stream, frameOffset, frameLen, frameFin, lossBufMetaIter);
|
|
conn.lossState.totalBytesRetransmitted += frameLen;
|
|
VLOG(10) << nodeToString(conn.nodeType) << " sent retransmission"
|
|
<< " packetNum=" << packetNum << " " << conn;
|
|
QUIC_STATS(conn.statsCallback, onPacketRetransmission);
|
|
return false;
|
|
}
|
|
|
|
folly::Expected<folly::Unit, QuicError> updateConnection(
|
|
QuicConnectionStateBase& conn,
|
|
Optional<ClonedPacketIdentifier> clonedPacketIdentifier,
|
|
RegularQuicWritePacket packet,
|
|
TimePoint sentTime,
|
|
uint32_t encodedSize,
|
|
uint32_t encodedBodySize,
|
|
bool isDSRPacket) {
|
|
auto packetNum = packet.header.getPacketSequenceNum();
|
|
// AckFrame, PaddingFrame and Datagrams are not retx-able.
|
|
bool retransmittable = false;
|
|
bool isPing = false;
|
|
uint32_t connWindowUpdateSent = 0;
|
|
uint32_t ackFrameCounter = 0;
|
|
uint32_t streamBytesSent = 0;
|
|
uint32_t newStreamBytesSent = 0;
|
|
OutstandingPacketWrapper::Metadata::DetailsPerStream detailsPerStream;
|
|
auto packetNumberSpace = packet.header.getPacketNumberSpace();
|
|
VLOG(10) << nodeToString(conn.nodeType) << " sent packetNum=" << packetNum
|
|
<< " in space=" << packetNumberSpace << " size=" << encodedSize
|
|
<< " bodySize: " << encodedBodySize << " isDSR=" << isDSRPacket
|
|
<< " " << conn;
|
|
if (conn.qLogger) {
|
|
conn.qLogger->addPacket(packet, encodedSize);
|
|
}
|
|
FOLLY_SDT(quic, update_connection_num_frames, packet.frames.size());
|
|
for (const auto& frame : packet.frames) {
|
|
switch (frame.type()) {
|
|
case QuicWriteFrame::Type::WriteStreamFrame: {
|
|
const WriteStreamFrame& writeStreamFrame = *frame.asWriteStreamFrame();
|
|
retransmittable = true;
|
|
auto streamResult =
|
|
conn.streamManager->getStream(writeStreamFrame.streamId);
|
|
if (!streamResult) {
|
|
return folly::makeUnexpected(streamResult.error());
|
|
}
|
|
auto stream = streamResult.value();
|
|
bool newStreamDataWritten = false;
|
|
if (writeStreamFrame.fromBufMeta) {
|
|
newStreamDataWritten = handleStreamBufMetaWritten(
|
|
conn,
|
|
*stream,
|
|
writeStreamFrame.offset,
|
|
writeStreamFrame.len,
|
|
writeStreamFrame.fin,
|
|
packetNum,
|
|
packetNumberSpace);
|
|
} else {
|
|
auto streamWrittenResult = handleStreamWritten(
|
|
conn,
|
|
*stream,
|
|
writeStreamFrame.offset,
|
|
writeStreamFrame.len,
|
|
writeStreamFrame.fin,
|
|
packetNum,
|
|
packetNumberSpace);
|
|
if (streamWrittenResult.hasError()) {
|
|
return folly::makeUnexpected(streamWrittenResult.error());
|
|
}
|
|
newStreamDataWritten = streamWrittenResult.value();
|
|
}
|
|
if (newStreamDataWritten) {
|
|
auto flowControlResult =
|
|
updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len);
|
|
if (flowControlResult.hasError()) {
|
|
return folly::makeUnexpected(flowControlResult.error());
|
|
}
|
|
maybeWriteBlockAfterSocketWrite(*stream);
|
|
maybeWriteDataBlockedAfterSocketWrite(conn);
|
|
conn.streamManager->addTx(writeStreamFrame.streamId);
|
|
newStreamBytesSent += writeStreamFrame.len;
|
|
}
|
|
// This call could take an argument whether the packet scheduler already
|
|
// removed stream from writeQueue
|
|
conn.streamManager->updateWritableStreams(
|
|
*stream, getSendConnFlowControlBytesWire(conn) > 0);
|
|
streamBytesSent += writeStreamFrame.len;
|
|
detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten);
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::WriteCryptoFrame: {
|
|
const WriteCryptoFrame& writeCryptoFrame = *frame.asWriteCryptoFrame();
|
|
retransmittable = true;
|
|
auto protectionType = packet.header.getProtectionType();
|
|
// NewSessionTicket is sent in crypto frame encrypted with 1-rtt key,
|
|
// however, it is not part of handshake
|
|
auto encryptionLevel = protectionTypeToEncryptionLevel(protectionType);
|
|
auto cryptoWritten = handleStreamWritten(
|
|
conn,
|
|
*getCryptoStream(*conn.cryptoState, encryptionLevel),
|
|
writeCryptoFrame.offset,
|
|
writeCryptoFrame.len,
|
|
false /* fin */,
|
|
packetNum,
|
|
packetNumberSpace);
|
|
if (cryptoWritten.hasError()) {
|
|
return folly::makeUnexpected(cryptoWritten.error());
|
|
}
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::WriteAckFrame: {
|
|
const WriteAckFrame& writeAckFrame = *frame.asWriteAckFrame();
|
|
DCHECK(!ackFrameCounter++)
|
|
<< "Send more than one WriteAckFrame " << conn;
|
|
auto largestAckedPacketWritten = writeAckFrame.ackBlocks.front().end;
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent packet with largestAcked="
|
|
<< largestAckedPacketWritten << " packetNum=" << packetNum
|
|
<< " " << conn;
|
|
++conn.numAckFramesSent;
|
|
updateAckSendStateOnSentPacketWithAcks(
|
|
conn,
|
|
getAckState(conn, packetNumberSpace),
|
|
largestAckedPacketWritten);
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::RstStreamFrame: {
|
|
const RstStreamFrame& rstStreamFrame = *frame.asRstStreamFrame();
|
|
retransmittable = true;
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent reset streams in packetNum=" << packetNum << " "
|
|
<< conn;
|
|
auto resetIter =
|
|
conn.pendingEvents.resets.find(rstStreamFrame.streamId);
|
|
// TODO: this can happen because we clone RST_STREAM frames. Should we
|
|
// start to treat RST_STREAM in the same way we treat window update?
|
|
if (resetIter != conn.pendingEvents.resets.end()) {
|
|
conn.pendingEvents.resets.erase(resetIter);
|
|
} else {
|
|
DCHECK(clonedPacketIdentifier.has_value())
|
|
<< " reset missing from pendingEvents for non-clone packet";
|
|
}
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::MaxDataFrame: {
|
|
const MaxDataFrame& maxDataFrame = *frame.asMaxDataFrame();
|
|
CHECK(!connWindowUpdateSent++)
|
|
<< "Send more than one connection window update " << conn;
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent conn window update packetNum=" << packetNum << " "
|
|
<< conn;
|
|
retransmittable = true;
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent conn window update in packetNum=" << packetNum << " "
|
|
<< conn;
|
|
++conn.numWindowUpdateFramesSent;
|
|
onConnWindowUpdateSent(conn, maxDataFrame.maximumData, sentTime);
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::DataBlockedFrame: {
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent conn data blocked frame=" << packetNum << " "
|
|
<< conn;
|
|
retransmittable = true;
|
|
conn.pendingEvents.sendDataBlocked = false;
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::MaxStreamDataFrame: {
|
|
const MaxStreamDataFrame& maxStreamDataFrame =
|
|
*frame.asMaxStreamDataFrame();
|
|
auto streamResult =
|
|
conn.streamManager->getStream(maxStreamDataFrame.streamId);
|
|
if (streamResult.hasError()) {
|
|
return folly::makeUnexpected(streamResult.error());
|
|
}
|
|
auto stream = streamResult.value();
|
|
retransmittable = true;
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent packet with window update packetNum=" << packetNum
|
|
<< " stream=" << maxStreamDataFrame.streamId << " " << conn;
|
|
++conn.numWindowUpdateFramesSent;
|
|
onStreamWindowUpdateSent(
|
|
*stream, maxStreamDataFrame.maximumData, sentTime);
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::StreamDataBlockedFrame: {
|
|
const StreamDataBlockedFrame& streamBlockedFrame =
|
|
*frame.asStreamDataBlockedFrame();
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " sent blocked stream frame packetNum=" << packetNum << " "
|
|
<< conn;
|
|
retransmittable = true;
|
|
conn.streamManager->removeBlocked(streamBlockedFrame.streamId);
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::PingFrame:
|
|
conn.pendingEvents.sendPing = false;
|
|
isPing = true;
|
|
retransmittable = true;
|
|
conn.numPingFramesSent++;
|
|
break;
|
|
case QuicWriteFrame::Type::QuicSimpleFrame: {
|
|
const QuicSimpleFrame& simpleFrame = *frame.asQuicSimpleFrame();
|
|
retransmittable = true;
|
|
// We don't want this triggered for cloned frames.
|
|
if (!clonedPacketIdentifier.has_value()) {
|
|
updateSimpleFrameOnPacketSent(conn, simpleFrame);
|
|
}
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::PaddingFrame: {
|
|
// do not mark padding as retransmittable. There are several reasons
|
|
// for this:
|
|
// 1. We might need to pad ACK packets to make it so that we can
|
|
// sample them correctly for header encryption. ACK packets may not
|
|
// count towards congestion window, so the padding frames in those
|
|
// ack packets should not count towards the window either
|
|
// 2. Of course we do not want to retransmit the ACK frames.
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::DatagramFrame: {
|
|
// do not mark Datagram frames as retransmittable
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::ImmediateAckFrame: {
|
|
// turn off the immediate ack pending event.
|
|
conn.pendingEvents.requestImmediateAck = false;
|
|
retransmittable = true;
|
|
break;
|
|
}
|
|
default:
|
|
retransmittable = true;
|
|
}
|
|
}
|
|
|
|
// This increments the next packet number and (potentially) the next non-DSR
|
|
// packet sequence number. Capture the non DSR sequence number before
|
|
// increment.
|
|
auto nonDsrPacketSequenceNumber =
|
|
getAckState(conn, packetNumberSpace).nonDsrPacketSequenceNumber;
|
|
increaseNextPacketNum(conn, packetNumberSpace, isDSRPacket);
|
|
conn.lossState.largestSent =
|
|
std::max(conn.lossState.largestSent.value_or(packetNum), packetNum);
|
|
// updateConnection may be called multiple times during write. If before or
|
|
// during any updateConnection, setLossDetectionAlarm is already set, we
|
|
// shouldn't clear it:
|
|
if (!conn.pendingEvents.setLossDetectionAlarm) {
|
|
conn.pendingEvents.setLossDetectionAlarm = retransmittable;
|
|
}
|
|
conn.lossState.maybeLastPacketSentTime = sentTime;
|
|
conn.lossState.totalBytesSent += encodedSize;
|
|
conn.lossState.totalBodyBytesSent += encodedBodySize;
|
|
conn.lossState.totalPacketsSent++;
|
|
conn.lossState.totalStreamBytesSent += streamBytesSent;
|
|
conn.lossState.totalNewStreamBytesSent += newStreamBytesSent;
|
|
|
|
// Count the number of packets sent in the current phase.
|
|
// This is used to initiate key updates if enabled.
|
|
if (packet.header.getProtectionType() == conn.oneRttWritePhase) {
|
|
if (conn.oneRttWritePendingVerification &&
|
|
conn.oneRttWritePacketsSentInCurrentPhase == 0) {
|
|
// This is the first packet in the new phase after we have initiated a key
|
|
// update. We need to keep track of it to confirm the peer acks it in the
|
|
// same phase.
|
|
conn.oneRttWritePendingVerificationPacketNumber =
|
|
packet.header.getPacketSequenceNum();
|
|
}
|
|
conn.oneRttWritePacketsSentInCurrentPhase++;
|
|
}
|
|
|
|
if (!retransmittable && !isPing) {
|
|
DCHECK(!clonedPacketIdentifier);
|
|
return folly::unit;
|
|
}
|
|
conn.lossState.totalAckElicitingPacketsSent++;
|
|
|
|
auto packetIt =
|
|
std::find_if(
|
|
conn.outstandings.packets.rbegin(),
|
|
conn.outstandings.packets.rend(),
|
|
[packetNum](const auto& packetWithTime) {
|
|
return packetWithTime.packet.header.getPacketSequenceNum() <
|
|
packetNum;
|
|
})
|
|
.base();
|
|
|
|
std::function<void(const quic::OutstandingPacketWrapper&)> packetDestroyFn =
|
|
[&conn](const quic::OutstandingPacketWrapper& pkt) {
|
|
for (auto& packetProcessor : conn.packetProcessors) {
|
|
packetProcessor->onPacketDestroyed(pkt);
|
|
}
|
|
};
|
|
|
|
auto& pkt = *conn.outstandings.packets.emplace(
|
|
packetIt,
|
|
std::move(packet),
|
|
sentTime,
|
|
encodedSize,
|
|
encodedBodySize,
|
|
// these numbers should all _include_ the current packet
|
|
// conn.lossState.inflightBytes isn't updated until below
|
|
// conn.outstandings.numOutstanding() + 1 since we're emplacing here
|
|
conn.lossState.totalBytesSent,
|
|
conn.lossState.inflightBytes + encodedSize,
|
|
conn.lossState,
|
|
conn.writeCount,
|
|
std::move(detailsPerStream),
|
|
conn.appLimitedTracker.getTotalAppLimitedTime(),
|
|
std::move(packetDestroyFn));
|
|
|
|
maybeAddPacketMark(conn, pkt);
|
|
|
|
pkt.isAppLimited = conn.congestionController
|
|
? conn.congestionController->isAppLimited()
|
|
: false;
|
|
if (conn.lossState.lastAckedTime.has_value() &&
|
|
conn.lossState.lastAckedPacketSentTime.has_value()) {
|
|
pkt.lastAckedPacketInfo.emplace(
|
|
*conn.lossState.lastAckedPacketSentTime,
|
|
*conn.lossState.lastAckedTime,
|
|
*conn.lossState.adjustedLastAckedTime,
|
|
conn.lossState.totalBytesSentAtLastAck,
|
|
conn.lossState.totalBytesAckedAtLastAck);
|
|
}
|
|
if (clonedPacketIdentifier) {
|
|
DCHECK(conn.outstandings.clonedPacketIdentifiers.count(
|
|
*clonedPacketIdentifier));
|
|
pkt.maybeClonedPacketIdentifier = std::move(clonedPacketIdentifier);
|
|
conn.lossState.totalBytesCloned += encodedSize;
|
|
}
|
|
pkt.isDSRPacket = isDSRPacket;
|
|
if (isDSRPacket) {
|
|
++conn.outstandings.dsrCount;
|
|
QUIC_STATS(conn.statsCallback, onDSRPacketSent, encodedSize);
|
|
} else {
|
|
// If it's not a DSR packet, set the sequence number to the previous one,
|
|
// as the state currently is the _next_ one after this packet.
|
|
pkt.nonDsrPacketSequenceNumber = nonDsrPacketSequenceNumber;
|
|
}
|
|
|
|
if (conn.congestionController) {
|
|
conn.congestionController->onPacketSent(pkt);
|
|
}
|
|
if (conn.pacer) {
|
|
conn.pacer->onPacketSent();
|
|
}
|
|
for (auto& packetProcessor : conn.packetProcessors) {
|
|
packetProcessor->onPacketSent(pkt);
|
|
}
|
|
|
|
if (conn.pathValidationLimiter &&
|
|
(conn.pendingEvents.pathChallenge || conn.outstandingPathValidation)) {
|
|
conn.pathValidationLimiter->onPacketSent(pkt.metadata.encodedSize);
|
|
}
|
|
conn.lossState.lastRetransmittablePacketSentTime = pkt.metadata.time;
|
|
if (pkt.maybeClonedPacketIdentifier) {
|
|
++conn.outstandings.clonedPacketCount[packetNumberSpace];
|
|
++conn.lossState.timeoutBasedRtxCount;
|
|
} else {
|
|
++conn.outstandings.packetCount[packetNumberSpace];
|
|
}
|
|
return folly::unit;
|
|
}
|
|
|
|
uint64_t probePacketWritableBytes(QuicConnectionStateBase& conn) {
|
|
uint64_t probeWritableBytes = maybeUnvalidatedClientWritableBytes(conn);
|
|
if (!probeWritableBytes) {
|
|
conn.numProbesWritableBytesLimited++;
|
|
}
|
|
return probeWritableBytes;
|
|
}
|
|
|
|
uint64_t congestionControlWritableBytes(QuicConnectionStateBase& conn) {
|
|
uint64_t writableBytes = std::numeric_limits<uint64_t>::max();
|
|
|
|
if (conn.pendingEvents.pathChallenge || conn.outstandingPathValidation) {
|
|
CHECK(conn.pathValidationLimiter);
|
|
// 0-RTT and path validation rate limiting should be mutually exclusive.
|
|
CHECK(!conn.writableBytesLimit);
|
|
|
|
// Use the default RTT measurement when starting a new path challenge (CC is
|
|
// reset). This shouldn't be an RTT sample, so we do not update the CC with
|
|
// this value.
|
|
writableBytes = conn.pathValidationLimiter->currentCredit(
|
|
std::chrono::steady_clock::now(),
|
|
conn.lossState.srtt == 0us ? kDefaultInitialRtt : conn.lossState.srtt);
|
|
} else if (conn.writableBytesLimit) {
|
|
writableBytes = maybeUnvalidatedClientWritableBytes(conn);
|
|
}
|
|
|
|
if (conn.congestionController) {
|
|
writableBytes = std::min<uint64_t>(
|
|
writableBytes, conn.congestionController->getWritableBytes());
|
|
|
|
if (conn.throttlingSignalProvider &&
|
|
conn.throttlingSignalProvider->getCurrentThrottlingSignal()
|
|
.has_value()) {
|
|
const auto& throttlingSignal =
|
|
conn.throttlingSignalProvider->getCurrentThrottlingSignal();
|
|
if (throttlingSignal.value().maybeBytesToSend.has_value()) {
|
|
// Cap the writable bytes by the amount of tokens available in the
|
|
// throttler's bucket if one found to be throttling the connection.
|
|
writableBytes = std::min(
|
|
throttlingSignal.value().maybeBytesToSend.value(), writableBytes);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (writableBytes == std::numeric_limits<uint64_t>::max()) {
|
|
return writableBytes;
|
|
}
|
|
|
|
// For real-CC/PathChallenge cases, round the result up to the nearest
|
|
// multiple of udpSendPacketLen.
|
|
return (writableBytes + conn.udpSendPacketLen - 1) / conn.udpSendPacketLen *
|
|
conn.udpSendPacketLen;
|
|
}
|
|
|
|
uint64_t unlimitedWritableBytes(QuicConnectionStateBase&) {
|
|
return std::numeric_limits<uint64_t>::max();
|
|
}
|
|
|
|
HeaderBuilder LongHeaderBuilder(LongHeader::Types packetType) {
|
|
return [packetType](
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
PacketNum packetNum,
|
|
QuicVersion version,
|
|
const std::string& token) {
|
|
return LongHeader(
|
|
packetType, srcConnId, dstConnId, packetNum, version, token);
|
|
};
|
|
}
|
|
|
|
HeaderBuilder ShortHeaderBuilder(ProtectionType keyPhase) {
|
|
return [keyPhase](
|
|
const ConnectionId& /* srcConnId */,
|
|
const ConnectionId& dstConnId,
|
|
PacketNum packetNum,
|
|
QuicVersion,
|
|
const std::string&) {
|
|
return ShortHeader(keyPhase, dstConnId, packetNum);
|
|
};
|
|
}
|
|
|
|
folly::Expected<WriteQuicDataResult, QuicError> writeCryptoAndAckDataToSocket(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
LongHeader::Types packetType,
|
|
Aead& cleartextCipher,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
uint64_t packetLimit,
|
|
const std::string& token) {
|
|
auto encryptionLevel = protectionTypeToEncryptionLevel(
|
|
longHeaderTypeToProtectionType(packetType));
|
|
FrameScheduler scheduler =
|
|
std::move(FrameScheduler::Builder(
|
|
connection,
|
|
encryptionLevel,
|
|
LongHeader::typeToPacketNumberSpace(packetType),
|
|
"CryptoAndAcksScheduler")
|
|
.ackFrames()
|
|
.cryptoFrames())
|
|
.build();
|
|
auto builder = LongHeaderBuilder(packetType);
|
|
WriteQuicDataResult result;
|
|
auto& packetsWritten = result.packetsWritten;
|
|
auto& bytesWritten = result.bytesWritten;
|
|
auto& probesWritten = result.probesWritten;
|
|
auto& cryptoStream =
|
|
*getCryptoStream(*connection.cryptoState, encryptionLevel);
|
|
auto& numProbePackets =
|
|
connection.pendingEvents
|
|
.numProbePackets[LongHeader::typeToPacketNumberSpace(packetType)];
|
|
if (numProbePackets &&
|
|
(cryptoStream.retransmissionBuffer.size() || scheduler.hasData())) {
|
|
auto probeResult = writeProbingDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
builder,
|
|
encryptionLevel,
|
|
LongHeader::typeToPacketNumberSpace(packetType),
|
|
scheduler,
|
|
numProbePackets, // This possibly bypasses the packetLimit.
|
|
cleartextCipher,
|
|
headerCipher,
|
|
version,
|
|
token);
|
|
if (probeResult.hasError()) {
|
|
return folly::makeUnexpected(probeResult.error());
|
|
}
|
|
probesWritten += probeResult->probesWritten;
|
|
bytesWritten += probeResult->bytesWritten;
|
|
}
|
|
packetLimit = probesWritten > packetLimit ? 0 : (packetLimit - probesWritten);
|
|
// Only get one chance to write probes.
|
|
numProbePackets = 0;
|
|
// Crypto data is written without aead protection.
|
|
auto writeResult = writeConnectionDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
builder,
|
|
LongHeader::typeToPacketNumberSpace(packetType),
|
|
scheduler,
|
|
congestionControlWritableBytes,
|
|
packetLimit - packetsWritten,
|
|
cleartextCipher,
|
|
headerCipher,
|
|
version,
|
|
Clock::now(),
|
|
token);
|
|
|
|
if (writeResult.hasError()) {
|
|
return folly::makeUnexpected(writeResult.error());
|
|
}
|
|
|
|
packetsWritten += writeResult->packetsWritten;
|
|
bytesWritten += writeResult->bytesWritten;
|
|
|
|
if (connection.transportSettings.immediatelyRetransmitInitialPackets &&
|
|
packetsWritten > 0 && packetsWritten < packetLimit) {
|
|
auto remainingLimit = packetLimit - packetsWritten;
|
|
auto cloneResult = writeProbingDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
builder,
|
|
encryptionLevel,
|
|
LongHeader::typeToPacketNumberSpace(packetType),
|
|
scheduler,
|
|
packetsWritten < remainingLimit ? packetsWritten : remainingLimit,
|
|
cleartextCipher,
|
|
headerCipher,
|
|
version,
|
|
token);
|
|
if (cloneResult.hasError()) {
|
|
return folly::makeUnexpected(cloneResult.error());
|
|
}
|
|
probesWritten += cloneResult->probesWritten;
|
|
bytesWritten += cloneResult->bytesWritten;
|
|
}
|
|
|
|
VLOG_IF(10, packetsWritten || probesWritten)
|
|
<< nodeToString(connection.nodeType)
|
|
<< " written crypto and acks data type=" << packetType
|
|
<< " packetsWritten=" << packetsWritten
|
|
<< " probesWritten=" << probesWritten << connection;
|
|
CHECK_GE(packetLimit, packetsWritten);
|
|
return result;
|
|
}
|
|
|
|
folly::Expected<WriteQuicDataResult, QuicError> writeQuicDataToSocket(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
uint64_t packetLimit,
|
|
TimePoint writeLoopBeginTime) {
|
|
return writeQuicDataToSocketImpl(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
aead,
|
|
headerCipher,
|
|
version,
|
|
packetLimit,
|
|
/*exceptCryptoStream=*/false,
|
|
writeLoopBeginTime);
|
|
}
|
|
|
|
folly::Expected<WriteQuicDataResult, QuicError>
|
|
writeQuicDataExceptCryptoStreamToSocket(
|
|
QuicAsyncUDPSocket& socket,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
uint64_t packetLimit) {
|
|
return writeQuicDataToSocketImpl(
|
|
socket,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
aead,
|
|
headerCipher,
|
|
version,
|
|
packetLimit,
|
|
/*exceptCryptoStream=*/true,
|
|
Clock::now());
|
|
}
|
|
|
|
folly::Expected<uint64_t, QuicError> writeZeroRttDataToSocket(
|
|
QuicAsyncUDPSocket& socket,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
uint64_t packetLimit) {
|
|
auto type = LongHeader::Types::ZeroRtt;
|
|
auto encryptionLevel =
|
|
protectionTypeToEncryptionLevel(longHeaderTypeToProtectionType(type));
|
|
auto builder = LongHeaderBuilder(type);
|
|
// Probe is not useful for zero rtt because we will always have handshake
|
|
// packets outstanding when sending zero rtt data.
|
|
FrameScheduler scheduler =
|
|
std::move(FrameScheduler::Builder(
|
|
connection,
|
|
encryptionLevel,
|
|
LongHeader::typeToPacketNumberSpace(type),
|
|
"ZeroRttScheduler")
|
|
.streamFrames()
|
|
.resetFrames()
|
|
.windowUpdateFrames()
|
|
.blockedFrames()
|
|
.simpleFrames())
|
|
.build();
|
|
auto writeResult = writeConnectionDataToSocket(
|
|
socket,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
std::move(builder),
|
|
LongHeader::typeToPacketNumberSpace(type),
|
|
scheduler,
|
|
congestionControlWritableBytes,
|
|
packetLimit,
|
|
aead,
|
|
headerCipher,
|
|
version,
|
|
Clock::now());
|
|
|
|
if (writeResult.hasError()) {
|
|
return folly::makeUnexpected(writeResult.error());
|
|
}
|
|
|
|
auto written = writeResult->packetsWritten;
|
|
VLOG_IF(10, written > 0) << nodeToString(connection.nodeType)
|
|
<< " written zero rtt data, packets=" << written
|
|
<< " " << connection;
|
|
DCHECK_GE(packetLimit, written);
|
|
return written;
|
|
}
|
|
|
|
void writeCloseCommon(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
PacketHeader&& header,
|
|
Optional<QuicError> closeDetails,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher) {
|
|
// close is special, we're going to bypass all the packet sent logic for all
|
|
// packets we send with a connection close frame.
|
|
PacketNumberSpace pnSpace = header.getPacketNumberSpace();
|
|
HeaderForm headerForm = header.getHeaderForm();
|
|
PacketNum packetNum = header.getPacketSequenceNum();
|
|
|
|
// Create a buffer onto which we write the connection close.
|
|
BufAccessor bufAccessor(connection.udpSendPacketLen);
|
|
InplaceQuicPacketBuilder packetBuilder(
|
|
bufAccessor,
|
|
connection.udpSendPacketLen,
|
|
header,
|
|
getAckState(connection, pnSpace).largestAckedByPeer.value_or(0));
|
|
auto encodeResult = packetBuilder.encodePacketHeader();
|
|
if (encodeResult.hasError()) {
|
|
LOG(ERROR) << "Error encoding packet header: "
|
|
<< encodeResult.error().message;
|
|
return;
|
|
}
|
|
packetBuilder.accountForCipherOverhead(aead.getCipherOverhead());
|
|
size_t written = 0;
|
|
if (!closeDetails) {
|
|
auto writeResult = writeFrame(
|
|
ConnectionCloseFrame(
|
|
QuicErrorCode(TransportErrorCode::NO_ERROR),
|
|
std::string("No error")),
|
|
packetBuilder);
|
|
if (writeResult.hasError()) {
|
|
LOG(ERROR) << "Error writing frame: " << writeResult.error().message;
|
|
return;
|
|
}
|
|
written = *writeResult;
|
|
} else {
|
|
switch (closeDetails->code.type()) {
|
|
case QuicErrorCode::Type::ApplicationErrorCode: {
|
|
auto writeResult = writeFrame(
|
|
ConnectionCloseFrame(
|
|
QuicErrorCode(*closeDetails->code.asApplicationErrorCode()),
|
|
closeDetails->message,
|
|
quic::FrameType::CONNECTION_CLOSE_APP_ERR),
|
|
packetBuilder);
|
|
if (writeResult.hasError()) {
|
|
LOG(ERROR) << "Error writing frame: " << writeResult.error().message;
|
|
return;
|
|
}
|
|
written = *writeResult;
|
|
break;
|
|
}
|
|
case QuicErrorCode::Type::TransportErrorCode: {
|
|
auto writeResult = writeFrame(
|
|
ConnectionCloseFrame(
|
|
QuicErrorCode(*closeDetails->code.asTransportErrorCode()),
|
|
closeDetails->message,
|
|
quic::FrameType::CONNECTION_CLOSE),
|
|
packetBuilder);
|
|
if (writeResult.hasError()) {
|
|
LOG(ERROR) << "Error writing frame: " << writeResult.error().message;
|
|
return;
|
|
}
|
|
written = *writeResult;
|
|
break;
|
|
}
|
|
case QuicErrorCode::Type::LocalErrorCode: {
|
|
auto writeResult = writeFrame(
|
|
ConnectionCloseFrame(
|
|
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
|
|
std::string("Internal error"),
|
|
quic::FrameType::CONNECTION_CLOSE),
|
|
packetBuilder);
|
|
if (writeResult.hasError()) {
|
|
LOG(ERROR) << "Error writing frame: " << writeResult.error().message;
|
|
return;
|
|
}
|
|
written = *writeResult;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (pnSpace == PacketNumberSpace::Initial &&
|
|
connection.nodeType == QuicNodeType::Client) {
|
|
while (packetBuilder.remainingSpaceInPkt() > 0) {
|
|
auto paddingResult = writeFrame(PaddingFrame(), packetBuilder);
|
|
if (paddingResult.hasError()) {
|
|
LOG(ERROR) << "Error writing padding frame: "
|
|
<< paddingResult.error().message;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
if (written == 0) {
|
|
LOG(ERROR) << "Close frame too large " << connection;
|
|
return;
|
|
}
|
|
auto packet = std::move(packetBuilder).buildPacket();
|
|
CHECK_GE(packet.body.tailroom(), aead.getCipherOverhead());
|
|
auto bufUniquePtr = packet.body.clone();
|
|
bufUniquePtr =
|
|
aead.inplaceEncrypt(std::move(bufUniquePtr), &packet.header, packetNum);
|
|
bufUniquePtr->coalesce();
|
|
encryptPacketHeader(
|
|
headerForm,
|
|
packet.header.writableData(),
|
|
packet.header.length(),
|
|
bufUniquePtr->data(),
|
|
bufUniquePtr->length(),
|
|
headerCipher);
|
|
folly::IOBuf packetBuf(std::move(packet.header));
|
|
packetBuf.prependChain(std::move(bufUniquePtr));
|
|
auto packetSize = packetBuf.computeChainDataLength();
|
|
if (connection.qLogger) {
|
|
connection.qLogger->addPacket(packet.packet, packetSize);
|
|
}
|
|
VLOG(10) << nodeToString(connection.nodeType)
|
|
<< " sent close packetNum=" << packetNum << " in space=" << pnSpace
|
|
<< " " << connection;
|
|
// Increment the sequence number.
|
|
increaseNextPacketNum(connection, pnSpace);
|
|
// best effort writing to the socket, ignore any errors.
|
|
|
|
Buf packetBufPtr = packetBuf.clone();
|
|
iovec vec[kNumIovecBufferChains];
|
|
size_t iovec_len = fillIovec(packetBufPtr, vec);
|
|
auto ret = sock.write(connection.peerAddress, vec, iovec_len);
|
|
connection.lossState.totalBytesSent += packetSize;
|
|
if (ret < 0) {
|
|
VLOG(4) << "Error writing connection close " << folly::errnoStr(errno)
|
|
<< " " << connection;
|
|
} else {
|
|
QUIC_STATS(connection.statsCallback, onWrite, ret);
|
|
}
|
|
}
|
|
|
|
void writeLongClose(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
LongHeader::Types headerType,
|
|
Optional<QuicError> closeDetails,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version) {
|
|
if (!connection.serverConnectionId) {
|
|
// It's possible that servers encountered an error before binding to a
|
|
// connection id.
|
|
return;
|
|
}
|
|
LongHeader header(
|
|
headerType,
|
|
srcConnId,
|
|
dstConnId,
|
|
getNextPacketNum(
|
|
connection, LongHeader::typeToPacketNumberSpace(headerType)),
|
|
version);
|
|
writeCloseCommon(
|
|
sock,
|
|
connection,
|
|
std::move(header),
|
|
std::move(closeDetails),
|
|
aead,
|
|
headerCipher);
|
|
}
|
|
|
|
void writeShortClose(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& connId,
|
|
Optional<QuicError> closeDetails,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher) {
|
|
auto header = ShortHeader(
|
|
connection.oneRttWritePhase,
|
|
connId,
|
|
getNextPacketNum(connection, PacketNumberSpace::AppData));
|
|
writeCloseCommon(
|
|
sock,
|
|
connection,
|
|
std::move(header),
|
|
std::move(closeDetails),
|
|
aead,
|
|
headerCipher);
|
|
}
|
|
|
|
void encryptPacketHeader(
|
|
HeaderForm headerForm,
|
|
uint8_t* header,
|
|
size_t headerLen,
|
|
const uint8_t* encryptedBody,
|
|
size_t bodyLen,
|
|
const PacketNumberCipher& headerCipher) {
|
|
// Header encryption.
|
|
auto packetNumberLength = parsePacketNumberLength(*header);
|
|
Sample sample;
|
|
size_t sampleBytesToUse = kMaxPacketNumEncodingSize - packetNumberLength;
|
|
// If there were less than 4 bytes in the packet number, some of the payload
|
|
// bytes will also be skipped during sampling.
|
|
CHECK_GE(bodyLen, sampleBytesToUse + sample.size());
|
|
encryptedBody += sampleBytesToUse;
|
|
memcpy(sample.data(), encryptedBody, sample.size());
|
|
|
|
folly::MutableByteRange initialByteRange(header, 1);
|
|
folly::MutableByteRange packetNumByteRange(
|
|
header + headerLen - packetNumberLength, packetNumberLength);
|
|
if (headerForm == HeaderForm::Short) {
|
|
headerCipher.encryptShortHeader(
|
|
sample, initialByteRange, packetNumByteRange);
|
|
} else {
|
|
headerCipher.encryptLongHeader(
|
|
sample, initialByteRange, packetNumByteRange);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Writes packets to the socket. The is the function that is called by all
|
|
* the other write*ToSocket() functions.
|
|
*
|
|
* The number of packets written is limited by:
|
|
* - the maximum batch size supported by the underlying writer
|
|
* (`maxBatchSize`)
|
|
* - the `packetLimit` input parameter
|
|
* - the value returned by the `writableBytesFunc` which usually is either the
|
|
* congestion control writable bytes or unlimited writable bytes (if the
|
|
* output of the given scheduler should not be subject to congestion
|
|
* control)
|
|
* - the maximum time to spend in a write loop as specified by
|
|
* `transportSettings.writeLimitRttFraction`
|
|
* - the amount of data available in the provided scheduler.
|
|
*
|
|
* Writing the packets involves:
|
|
* 1. The scheduler which decides the data to write in each packet
|
|
* 2. The IOBufQuicBatch which holds the data output by the scheduler
|
|
* 3. The BatchWriter which writes the data from the IOBufQuicBatch to
|
|
* the socket
|
|
*
|
|
* The IOBufQuicBatch can hold packets either as a chain of IOBufs or as a
|
|
* single contiguous buffer (continuous vs. chained memory datapaths). This also
|
|
* affects the type of BatchWriter used to read the IOBufQuicBatch and write it
|
|
* to the socket.
|
|
*
|
|
* A rough outline of this function is as follows:
|
|
* 1. Make a BatchWriter for the requested batching mode and datapath type.
|
|
* 2. Make an IOBufQuicBatch to hold the data. This owns the BatchWriter created
|
|
* above which it will use to write its data to the socket later.
|
|
* 3. Based upon the selected datapathType, the dataplaneFunc is chosen.
|
|
* 4. The dataplaneFunc is responsible for writing the scheduler's data into the
|
|
* IOBufQuicBatch in the desired format, and calling the IOBufQuicBatch's
|
|
* write() function which wraps around the BatchWriter it owns.
|
|
* 5. Each dataplaneFunc call writes one packet to the IOBufQuicBatch. It is
|
|
* called repeatedly until one of the limits described above is hit.
|
|
* 6. After each packet is written, the connection state is updated to reflect a
|
|
* packet being sent.
|
|
* 7. Once the limit is hit, the IOBufQuicBatch is flushed to give it another
|
|
* chance to write any remaining data to the socket that hasn't already been
|
|
* written in the loop.
|
|
*
|
|
* Note that:
|
|
* - This function does not guarantee that the data is written to the underlying
|
|
* UDP socket buffer.
|
|
* - It only guarantees that packets will be scheduled and written to a
|
|
* IOBufQuicBatch and that the IOBufQuicBatch will get a chance to write to
|
|
* the socket.
|
|
* - Step 6 above updates the connection state when the packet is written to the
|
|
* buffer, but not necessarily when it is written to the socket. This decision
|
|
* is made by the IOBufQuicBatch and its BatchWriter.
|
|
* - This function attempts to flush the IOBufQuicBatch before returning
|
|
* to try to ensure that all scheduled data is written into the socket.
|
|
* - If that flush still fails, the packets are considered written to the
|
|
* network, since currently there is no way to rewind scheduler and connection
|
|
* state after the packets have been written to a batch.
|
|
*/
|
|
folly::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
HeaderBuilder builder,
|
|
PacketNumberSpace pnSpace,
|
|
QuicPacketScheduler& scheduler,
|
|
const WritableBytesFunc& writableBytesFunc,
|
|
uint64_t packetLimit,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
TimePoint writeLoopBeginTime,
|
|
const std::string& token) {
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.schedulerName = scheduler.name().str();
|
|
connection.writeDebugState.noWriteReason = NoWriteReason::WRITE_OK;
|
|
}
|
|
|
|
// Note: if a write is pending, it will be taken over by the batch writer when
|
|
// it's created. So this check has to be done before creating the batch
|
|
// writer.
|
|
bool pendingBufferedWrite = hasBufferedDataToWrite(connection);
|
|
|
|
if (!scheduler.hasData() && !pendingBufferedWrite) {
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.noWriteReason = NoWriteReason::EMPTY_SCHEDULER;
|
|
}
|
|
return WriteQuicDataResult{0, 0, 0};
|
|
}
|
|
|
|
VLOG(10) << nodeToString(connection.nodeType)
|
|
<< " writing data using scheduler=" << scheduler.name() << " "
|
|
<< connection;
|
|
|
|
if (!connection.gsoSupported.has_value()) {
|
|
connection.gsoSupported = sock.getGSO() >= 0;
|
|
if (!*connection.gsoSupported) {
|
|
if (!useSinglePacketInplaceBatchWriter(
|
|
connection.transportSettings.maxBatchSize,
|
|
connection.transportSettings.dataPathType) &&
|
|
(connection.transportSettings.dataPathType ==
|
|
DataPathType::ContinuousMemory)) {
|
|
// Change data path type to DataPathType::ChainedMemory.
|
|
// Continuous memory data path is only supported with working GSO or
|
|
// SinglePacketInplaceBatchWriter.
|
|
LOG(ERROR) << "Switching data path to ChainedMemory as "
|
|
<< "GSO is not supported on the socket";
|
|
connection.transportSettings.dataPathType = DataPathType::ChainedMemory;
|
|
}
|
|
}
|
|
}
|
|
|
|
auto batchWriter = BatchWriterFactory::makeBatchWriter(
|
|
connection.transportSettings.batchingMode,
|
|
connection.transportSettings.maxBatchSize,
|
|
connection.transportSettings.enableWriterBackpressure,
|
|
connection.transportSettings.dataPathType,
|
|
connection,
|
|
*connection.gsoSupported);
|
|
|
|
auto happyEyeballsState = connection.nodeType == QuicNodeType::Server
|
|
? nullptr
|
|
: &static_cast<QuicClientConnectionState&>(connection).happyEyeballsState;
|
|
IOBufQuicBatch ioBufBatch(
|
|
std::move(batchWriter),
|
|
sock,
|
|
connection.peerAddress,
|
|
connection.statsCallback,
|
|
happyEyeballsState);
|
|
|
|
// If we have a pending write to retry. Flush that first and make sure it
|
|
// succeeds before scheduling any new data.
|
|
if (pendingBufferedWrite) {
|
|
bool flushSuccess = ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
if (!flushSuccess) {
|
|
// Could not flush retried data. Return empty write result and wait for
|
|
// next retry.
|
|
return WriteQuicDataResult{0, 0, 0};
|
|
}
|
|
}
|
|
|
|
auto batchSize = connection.transportSettings.batchingMode ==
|
|
QuicBatchingMode::BATCHING_MODE_NONE
|
|
? connection.transportSettings.writeConnectionDataPacketsLimit
|
|
: connection.transportSettings.maxBatchSize;
|
|
|
|
uint64_t bytesWritten = 0;
|
|
uint64_t shortHeaderPadding = 0;
|
|
uint64_t shortHeaderPaddingCount = 0;
|
|
SCOPE_EXIT {
|
|
auto nSent = ioBufBatch.getPktSent();
|
|
if (nSent > 0) {
|
|
QUIC_STATS(connection.statsCallback, onPacketsSent, nSent);
|
|
QUIC_STATS(connection.statsCallback, onWrite, bytesWritten);
|
|
if (shortHeaderPadding > 0) {
|
|
QUIC_STATS(
|
|
connection.statsCallback,
|
|
onShortHeaderPaddingBatch,
|
|
shortHeaderPaddingCount,
|
|
shortHeaderPadding);
|
|
}
|
|
}
|
|
};
|
|
|
|
quic::TimePoint sentTime = Clock::now();
|
|
|
|
while (scheduler.hasData() && ioBufBatch.getPktSent() < packetLimit &&
|
|
((ioBufBatch.getPktSent() < batchSize) ||
|
|
writeLoopTimeLimit(writeLoopBeginTime, connection))) {
|
|
auto packetNum = getNextPacketNum(connection, pnSpace);
|
|
auto header = builder(srcConnId, dstConnId, packetNum, version, token);
|
|
uint32_t writableBytes = folly::to<uint32_t>(std::min<uint64_t>(
|
|
connection.udpSendPacketLen, writableBytesFunc(connection)));
|
|
uint64_t cipherOverhead = aead.getCipherOverhead();
|
|
if (writableBytes < cipherOverhead) {
|
|
writableBytes = 0;
|
|
} else {
|
|
writableBytes -= cipherOverhead;
|
|
}
|
|
|
|
auto writeQueueTransaction =
|
|
connection.streamManager->writeQueue().beginTransaction();
|
|
auto guard = folly::makeGuard([&] {
|
|
connection.streamManager->writeQueue().rollbackTransaction(
|
|
std::move(writeQueueTransaction));
|
|
});
|
|
const auto& dataPlaneFunc =
|
|
connection.transportSettings.dataPathType == DataPathType::ChainedMemory
|
|
? iobufChainBasedBuildScheduleEncrypt
|
|
: continuousMemoryBuildScheduleEncrypt;
|
|
auto ret = dataPlaneFunc(
|
|
connection,
|
|
std::move(header),
|
|
pnSpace,
|
|
packetNum,
|
|
cipherOverhead,
|
|
scheduler,
|
|
writableBytes,
|
|
ioBufBatch,
|
|
aead,
|
|
headerCipher);
|
|
|
|
// This is a fatal error vs. a build error.
|
|
if (ret.hasError()) {
|
|
return folly::makeUnexpected(ret.error());
|
|
}
|
|
if (!ret->buildSuccess) {
|
|
// If we're returning because we couldn't schedule more packets,
|
|
// make sure we flush the buffer in this function.
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
|
|
}
|
|
// If we build a packet, we updateConnection(), even if write might have
|
|
// been failed. Because if it builds, a lot of states need to be updated no
|
|
// matter the write result. We are basically treating this case as if we
|
|
// pretend write was also successful but packet is lost somewhere in the
|
|
// network.
|
|
bytesWritten += ret->encodedSize;
|
|
if (ret->result && ret->result->shortHeaderPadding > 0) {
|
|
shortHeaderPaddingCount++;
|
|
shortHeaderPadding += ret->result->shortHeaderPadding;
|
|
}
|
|
|
|
auto& result = ret->result;
|
|
// This call to updateConnection will attempt to erase streams from the
|
|
// write queue that have already been removed in QuicPacketScheduler.
|
|
// Removing non-existent streams can be O(N), consider passing the
|
|
// transaction set to skip this step
|
|
auto updateConnResult = updateConnection(
|
|
connection,
|
|
std::move(result->clonedPacketIdentifier),
|
|
std::move(result->packet->packet),
|
|
sentTime,
|
|
folly::to<uint32_t>(ret->encodedSize),
|
|
folly::to<uint32_t>(ret->encodedBodySize),
|
|
false /* isDSRPacket */);
|
|
if (updateConnResult.hasError()) {
|
|
return folly::makeUnexpected(updateConnResult.error());
|
|
}
|
|
guard.dismiss();
|
|
connection.streamManager->writeQueue().commitTransaction(
|
|
std::move(writeQueueTransaction));
|
|
|
|
// if ioBufBatch.write returns false
|
|
// it is because a flush() call failed
|
|
if (!ret->writeSuccess) {
|
|
if (connection.loopDetectorCallback) {
|
|
connection.writeDebugState.noWriteReason =
|
|
NoWriteReason::SOCKET_FAILURE;
|
|
}
|
|
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
|
|
}
|
|
|
|
if ((connection.transportSettings.batchingMode ==
|
|
QuicBatchingMode::BATCHING_MODE_NONE) &&
|
|
useSinglePacketInplaceBatchWriter(
|
|
connection.transportSettings.maxBatchSize,
|
|
connection.transportSettings.dataPathType)) {
|
|
// With SinglePacketInplaceBatchWriter we always write one packet, and so
|
|
// ioBufBatch needs a flush.
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
}
|
|
}
|
|
|
|
// Ensure that the buffer is flushed before returning
|
|
ioBufBatch.flush();
|
|
updateErrnoCount(connection, ioBufBatch);
|
|
|
|
if (connection.transportSettings.dataPathType ==
|
|
DataPathType::ContinuousMemory) {
|
|
CHECK(connection.bufAccessor->ownsBuffer());
|
|
CHECK(
|
|
connection.bufAccessor->length() == 0 &&
|
|
connection.bufAccessor->headroom() == 0);
|
|
}
|
|
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
|
|
}
|
|
|
|
folly::Expected<WriteQuicDataResult, QuicError> writeProbingDataToSocket(
|
|
QuicAsyncUDPSocket& sock,
|
|
QuicConnectionStateBase& connection,
|
|
const ConnectionId& srcConnId,
|
|
const ConnectionId& dstConnId,
|
|
const HeaderBuilder& builder,
|
|
EncryptionLevel encryptionLevel,
|
|
PacketNumberSpace pnSpace,
|
|
FrameScheduler scheduler,
|
|
uint8_t probesToSend,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
QuicVersion version,
|
|
const std::string& token) {
|
|
// Skip a packet number for probing packets to elicit acks
|
|
increaseNextPacketNum(connection, pnSpace);
|
|
CloningScheduler cloningScheduler(
|
|
scheduler, connection, "CloningScheduler", aead.getCipherOverhead());
|
|
auto writeLoopBeginTime = Clock::now();
|
|
|
|
// If we have the ability to draw an ACK for AppData, let's send a probe that
|
|
// is just an IMMEDIATE_ACK. Increase the number of probes to do so.
|
|
uint8_t dataProbesToSend = probesToSend;
|
|
if (probesToSend && canSendAckControlFrames(connection) &&
|
|
encryptionLevel == EncryptionLevel::AppData) {
|
|
probesToSend = std::max<uint8_t>(probesToSend, kPacketToSendForPTO);
|
|
dataProbesToSend = probesToSend - 1;
|
|
}
|
|
auto cloningResult = writeConnectionDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
builder,
|
|
pnSpace,
|
|
cloningScheduler,
|
|
connection.transportSettings.enableWritableBytesLimit
|
|
? probePacketWritableBytes
|
|
: unlimitedWritableBytes,
|
|
dataProbesToSend,
|
|
aead,
|
|
headerCipher,
|
|
version,
|
|
writeLoopBeginTime,
|
|
token);
|
|
if (cloningResult.hasError()) {
|
|
return folly::makeUnexpected(cloningResult.error());
|
|
}
|
|
auto probesWritten = cloningResult->packetsWritten;
|
|
auto bytesWritten = cloningResult->bytesWritten;
|
|
if (probesWritten < probesToSend) {
|
|
// If we can use an IMMEDIATE_ACK, that's better than a PING.
|
|
auto probeSchedulerBuilder = FrameScheduler::Builder(
|
|
connection, encryptionLevel, pnSpace, "ProbeScheduler");
|
|
// Might as well include some ACKs.
|
|
probeSchedulerBuilder.ackFrames();
|
|
if (canSendAckControlFrames(connection) &&
|
|
encryptionLevel == EncryptionLevel::AppData) {
|
|
requestPeerImmediateAck(connection);
|
|
probeSchedulerBuilder.immediateAckFrames();
|
|
} else {
|
|
connection.pendingEvents.sendPing = true;
|
|
probeSchedulerBuilder.pingFrames();
|
|
}
|
|
auto probeScheduler = std::move(probeSchedulerBuilder).build();
|
|
auto probingResult = writeConnectionDataToSocket(
|
|
sock,
|
|
connection,
|
|
srcConnId,
|
|
dstConnId,
|
|
builder,
|
|
pnSpace,
|
|
probeScheduler,
|
|
connection.transportSettings.enableWritableBytesLimit
|
|
? probePacketWritableBytes
|
|
: unlimitedWritableBytes,
|
|
probesToSend - probesWritten,
|
|
aead,
|
|
headerCipher,
|
|
version,
|
|
writeLoopBeginTime);
|
|
if (probingResult.hasError()) {
|
|
return folly::makeUnexpected(probingResult.error());
|
|
}
|
|
probesWritten += probingResult->packetsWritten;
|
|
bytesWritten += probingResult->bytesWritten;
|
|
}
|
|
VLOG_IF(10, probesWritten > 0)
|
|
<< nodeToString(connection.nodeType)
|
|
<< " writing probes using scheduler=CloningScheduler " << connection;
|
|
return WriteQuicDataResult{0, probesWritten, bytesWritten};
|
|
}
|
|
|
|
WriteDataReason shouldWriteData(/*const*/ QuicConnectionStateBase& conn) {
|
|
auto& numProbePackets = conn.pendingEvents.numProbePackets;
|
|
bool shouldWriteInitialProbes =
|
|
numProbePackets[PacketNumberSpace::Initial] && conn.initialWriteCipher;
|
|
bool shouldWriteHandshakeProbes =
|
|
numProbePackets[PacketNumberSpace::Handshake] &&
|
|
conn.handshakeWriteCipher;
|
|
bool shouldWriteAppDataProbes =
|
|
numProbePackets[PacketNumberSpace::AppData] && conn.oneRttWriteCipher;
|
|
if (shouldWriteInitialProbes || shouldWriteHandshakeProbes ||
|
|
shouldWriteAppDataProbes) {
|
|
VLOG(10) << nodeToString(conn.nodeType) << " needs write because of PTO"
|
|
<< conn;
|
|
return WriteDataReason::PROBES;
|
|
}
|
|
if (hasAckDataToWrite(conn)) {
|
|
VLOG(10) << nodeToString(conn.nodeType) << " needs write because of ACKs "
|
|
<< conn;
|
|
return WriteDataReason::ACK;
|
|
}
|
|
|
|
if (!congestionControlWritableBytes(conn)) {
|
|
QUIC_STATS(conn.statsCallback, onCwndBlocked);
|
|
return WriteDataReason::NO_WRITE;
|
|
}
|
|
|
|
if (hasBufferedDataToWrite(conn)) {
|
|
return WriteDataReason::BUFFERED_WRITE;
|
|
}
|
|
|
|
return hasNonAckDataToWrite(conn);
|
|
}
|
|
|
|
bool hasAckDataToWrite(const QuicConnectionStateBase& conn) {
|
|
// hasAcksToSchedule tells us whether we have acks.
|
|
// needsToSendAckImmediately tells us when to schedule the acks. If we don't
|
|
// have an immediate need to schedule the acks then we need to wait till we
|
|
// satisfy a condition where there is immediate need, so we shouldn't
|
|
// consider the acks to be writable.
|
|
bool writeAcks =
|
|
(toWriteInitialAcks(conn) || toWriteHandshakeAcks(conn) ||
|
|
toWriteAppDataAcks(conn));
|
|
VLOG_IF(10, writeAcks) << nodeToString(conn.nodeType)
|
|
<< " needs write because of acks largestAck="
|
|
<< largestAckToSendToString(conn) << " largestSentAck="
|
|
<< largestAckScheduledToString(conn)
|
|
<< " ackTimeoutSet="
|
|
<< conn.pendingEvents.scheduleAckTimeout << " "
|
|
<< conn;
|
|
return writeAcks;
|
|
}
|
|
|
|
bool hasBufferedDataToWrite(const QuicConnectionStateBase& conn) {
|
|
return (bool)conn.pendingWriteBatch_.buf;
|
|
}
|
|
|
|
WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) {
|
|
if (cryptoHasWritableData(conn)) {
|
|
VLOG(10) << nodeToString(conn.nodeType)
|
|
<< " needs write because of crypto stream" << " " << conn;
|
|
return WriteDataReason::CRYPTO_STREAM;
|
|
}
|
|
if (!conn.oneRttWriteCipher &&
|
|
!(conn.nodeType == QuicNodeType::Client &&
|
|
static_cast<const QuicClientConnectionState&>(conn)
|
|
.zeroRttWriteCipher)) {
|
|
// All the rest of the types of data need either a 1-rtt or 0-rtt cipher to
|
|
// be written.
|
|
return WriteDataReason::NO_WRITE;
|
|
}
|
|
if (!conn.pendingEvents.resets.empty()) {
|
|
return WriteDataReason::RESET;
|
|
}
|
|
if (conn.streamManager->hasWindowUpdates()) {
|
|
return WriteDataReason::STREAM_WINDOW_UPDATE;
|
|
}
|
|
if (conn.pendingEvents.connWindowUpdate) {
|
|
return WriteDataReason::CONN_WINDOW_UPDATE;
|
|
}
|
|
if (conn.streamManager->hasBlocked()) {
|
|
return WriteDataReason::BLOCKED;
|
|
}
|
|
// If we have lost data or flow control + stream data.
|
|
if (conn.streamManager->hasLoss() ||
|
|
(getSendConnFlowControlBytesWire(conn) != 0 &&
|
|
conn.streamManager->hasWritable())) {
|
|
return WriteDataReason::STREAM;
|
|
}
|
|
if (!conn.pendingEvents.frames.empty()) {
|
|
return WriteDataReason::SIMPLE;
|
|
}
|
|
if ((conn.pendingEvents.pathChallenge.has_value())) {
|
|
return WriteDataReason::PATHCHALLENGE;
|
|
}
|
|
if (conn.pendingEvents.sendPing) {
|
|
return WriteDataReason::PING;
|
|
}
|
|
if (!conn.datagramState.writeBuffer.empty()) {
|
|
return WriteDataReason::DATAGRAM;
|
|
}
|
|
return WriteDataReason::NO_WRITE;
|
|
}
|
|
|
|
void maybeSendStreamLimitUpdates(QuicConnectionStateBase& conn) {
|
|
auto update = conn.streamManager->remoteBidirectionalStreamLimitUpdate();
|
|
if (update) {
|
|
sendSimpleFrame(conn, (MaxStreamsFrame(*update, true)));
|
|
}
|
|
update = conn.streamManager->remoteUnidirectionalStreamLimitUpdate();
|
|
if (update) {
|
|
sendSimpleFrame(conn, (MaxStreamsFrame(*update, false)));
|
|
}
|
|
}
|
|
|
|
void implicitAckCryptoStream(
|
|
QuicConnectionStateBase& conn,
|
|
EncryptionLevel encryptionLevel) {
|
|
auto implicitAckTime = Clock::now();
|
|
auto packetNumSpace = encryptionLevel == EncryptionLevel::Handshake
|
|
? PacketNumberSpace::Handshake
|
|
: PacketNumberSpace::Initial;
|
|
auto& ackState = getAckState(conn, packetNumSpace);
|
|
AckBlocks ackBlocks;
|
|
ReadAckFrame implicitAck;
|
|
implicitAck.ackDelay = 0ms;
|
|
implicitAck.implicit = true;
|
|
for (const auto& op : conn.outstandings.packets) {
|
|
if (op.packet.header.getPacketNumberSpace() == packetNumSpace) {
|
|
ackBlocks.insert(op.packet.header.getPacketSequenceNum());
|
|
}
|
|
}
|
|
if (ackBlocks.empty()) {
|
|
return;
|
|
}
|
|
// Construct an implicit ack covering the entire range of packets.
|
|
// If some of these have already been ACK'd then processAckFrame
|
|
// should simply ignore them.
|
|
implicitAck.largestAcked = ackBlocks.back().end;
|
|
implicitAck.ackBlocks.emplace_back(
|
|
ackBlocks.front().start, implicitAck.largestAcked);
|
|
auto result = processAckFrame(
|
|
conn,
|
|
packetNumSpace,
|
|
implicitAck,
|
|
[](const auto&) {
|
|
// ackedPacketVisitor. No action needed.
|
|
return folly::unit;
|
|
},
|
|
[&](auto&, auto& packetFrame) {
|
|
switch (packetFrame.type()) {
|
|
case QuicWriteFrame::Type::WriteCryptoFrame: {
|
|
const WriteCryptoFrame& frame = *packetFrame.asWriteCryptoFrame();
|
|
auto cryptoStream =
|
|
getCryptoStream(*conn.cryptoState, encryptionLevel);
|
|
processCryptoStreamAck(*cryptoStream, frame.offset, frame.len);
|
|
break;
|
|
}
|
|
case QuicWriteFrame::Type::WriteAckFrame: {
|
|
const WriteAckFrame& frame = *packetFrame.asWriteAckFrame();
|
|
commonAckVisitorForAckFrame(ackState, frame);
|
|
break;
|
|
}
|
|
default: {
|
|
// We don't bother checking for valid packets, since these are
|
|
// our outstanding packets.
|
|
}
|
|
}
|
|
return folly::unit;
|
|
},
|
|
// We shouldn't mark anything as lost from the implicit ACK, as it should
|
|
// be ACKing the entire rangee.
|
|
[](auto&, auto&, auto) {
|
|
LOG(FATAL) << "Got loss from implicit crypto ACK.";
|
|
return folly::unit;
|
|
},
|
|
implicitAckTime);
|
|
// TODO handle error
|
|
CHECK(result.hasValue());
|
|
// Clear our the loss buffer explicitly. The implicit ACK itself will not
|
|
// remove data already in the loss buffer.
|
|
auto cryptoStream = getCryptoStream(*conn.cryptoState, encryptionLevel);
|
|
cryptoStream->lossBuffer.clear();
|
|
CHECK(cryptoStream->retransmissionBuffer.empty());
|
|
// The write buffer should be empty, there's no optional crypto data.
|
|
CHECK(cryptoStream->pendingWrites.empty());
|
|
}
|
|
|
|
void handshakeConfirmed(QuicConnectionStateBase& conn) {
|
|
// If we've supposedly confirmed the handshake and don't have the 1RTT
|
|
// ciphers installed, we are going to have problems.
|
|
CHECK(conn.oneRttWriteCipher);
|
|
CHECK(conn.oneRttWriteHeaderCipher);
|
|
CHECK(conn.readCodec->getOneRttReadCipher());
|
|
CHECK(conn.readCodec->getOneRttHeaderCipher());
|
|
conn.readCodec->onHandshakeDone(Clock::now());
|
|
conn.initialWriteCipher.reset();
|
|
conn.initialHeaderCipher.reset();
|
|
conn.readCodec->setInitialReadCipher(nullptr);
|
|
conn.readCodec->setInitialHeaderCipher(nullptr);
|
|
implicitAckCryptoStream(conn, EncryptionLevel::Initial);
|
|
conn.ackStates.initialAckState.reset();
|
|
conn.handshakeWriteCipher.reset();
|
|
conn.handshakeWriteHeaderCipher.reset();
|
|
conn.readCodec->setHandshakeReadCipher(nullptr);
|
|
conn.readCodec->setHandshakeHeaderCipher(nullptr);
|
|
implicitAckCryptoStream(conn, EncryptionLevel::Handshake);
|
|
conn.ackStates.handshakeAckState.reset();
|
|
}
|
|
|
|
bool hasInitialOrHandshakeCiphers(QuicConnectionStateBase& conn) {
|
|
return conn.initialWriteCipher || conn.handshakeWriteCipher ||
|
|
conn.readCodec->getInitialCipher() ||
|
|
conn.readCodec->getHandshakeReadCipher();
|
|
}
|
|
|
|
bool toWriteInitialAcks(const quic::QuicConnectionStateBase& conn) {
|
|
return (
|
|
conn.initialWriteCipher && conn.ackStates.initialAckState &&
|
|
hasAcksToSchedule(*conn.ackStates.initialAckState) &&
|
|
conn.ackStates.initialAckState->needsToSendAckImmediately);
|
|
}
|
|
|
|
bool toWriteHandshakeAcks(const quic::QuicConnectionStateBase& conn) {
|
|
return (
|
|
conn.handshakeWriteCipher && conn.ackStates.handshakeAckState &&
|
|
hasAcksToSchedule(*conn.ackStates.handshakeAckState) &&
|
|
conn.ackStates.handshakeAckState->needsToSendAckImmediately);
|
|
}
|
|
|
|
bool toWriteAppDataAcks(const quic::QuicConnectionStateBase& conn) {
|
|
return (
|
|
conn.oneRttWriteCipher &&
|
|
hasAcksToSchedule(conn.ackStates.appDataAckState) &&
|
|
conn.ackStates.appDataAckState.needsToSendAckImmediately);
|
|
}
|
|
|
|
void updateOneRttWriteCipher(
|
|
quic::QuicConnectionStateBase& conn,
|
|
std::unique_ptr<Aead> aead,
|
|
ProtectionType oneRttPhase) {
|
|
CHECK(
|
|
oneRttPhase == ProtectionType::KeyPhaseZero ||
|
|
oneRttPhase == ProtectionType::KeyPhaseOne);
|
|
CHECK(oneRttPhase != conn.oneRttWritePhase)
|
|
<< "Cannot replace cipher for current write phase";
|
|
conn.oneRttWriteCipher = std::move(aead);
|
|
conn.oneRttWritePhase = oneRttPhase;
|
|
conn.oneRttWritePacketsSentInCurrentPhase = 0;
|
|
}
|
|
|
|
void maybeHandleIncomingKeyUpdate(QuicConnectionStateBase& conn) {
|
|
if (conn.readCodec->getCurrentOneRttReadPhase() != conn.oneRttWritePhase) {
|
|
// Peer has initiated a key update.
|
|
updateOneRttWriteCipher(
|
|
conn,
|
|
conn.handshakeLayer->getNextOneRttWriteCipher(),
|
|
conn.readCodec->getCurrentOneRttReadPhase());
|
|
|
|
conn.readCodec->setNextOneRttReadCipher(
|
|
conn.handshakeLayer->getNextOneRttReadCipher());
|
|
}
|
|
}
|
|
|
|
void maybeInitiateKeyUpdate(QuicConnectionStateBase& conn) {
|
|
if (conn.transportSettings.initiateKeyUpdate) {
|
|
auto packetsBeforeNextUpdate =
|
|
conn.transportSettings.firstKeyUpdatePacketCount
|
|
? conn.transportSettings.firstKeyUpdatePacketCount.value()
|
|
: conn.transportSettings.keyUpdatePacketCountInterval;
|
|
|
|
if ((conn.oneRttWritePacketsSentInCurrentPhase > packetsBeforeNextUpdate) &&
|
|
conn.readCodec->canInitiateKeyUpdate()) {
|
|
QUIC_STATS(conn.statsCallback, onKeyUpdateAttemptInitiated);
|
|
conn.readCodec->advanceOneRttReadPhase();
|
|
conn.transportSettings.firstKeyUpdatePacketCount.reset();
|
|
|
|
updateOneRttWriteCipher(
|
|
conn,
|
|
conn.handshakeLayer->getNextOneRttWriteCipher(),
|
|
conn.readCodec->getCurrentOneRttReadPhase());
|
|
conn.readCodec->setNextOneRttReadCipher(
|
|
conn.handshakeLayer->getNextOneRttReadCipher());
|
|
// Signal the transport that a key update has been initiated.
|
|
conn.oneRttWritePendingVerification = true;
|
|
conn.oneRttWritePendingVerificationPacketNumber.reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
folly::Expected<folly::Unit, QuicError> maybeVerifyPendingKeyUpdate(
|
|
QuicConnectionStateBase& conn,
|
|
const OutstandingPacketWrapper& outstandingPacket,
|
|
const RegularQuicPacket& ackPacket) {
|
|
if (!(protectionTypeToEncryptionLevel(
|
|
outstandingPacket.packet.header.getProtectionType()) ==
|
|
EncryptionLevel::AppData)) {
|
|
// This is not an app data packet. We can't have initiated a key update yet.
|
|
return folly::unit;
|
|
}
|
|
|
|
if (conn.oneRttWritePendingVerificationPacketNumber &&
|
|
outstandingPacket.packet.header.getPacketSequenceNum() >=
|
|
conn.oneRttWritePendingVerificationPacketNumber.value()) {
|
|
// There is a pending key update. This packet should be acked in
|
|
// the current phase.
|
|
if (ackPacket.header.getProtectionType() == conn.oneRttWritePhase) {
|
|
// Key update is verified.
|
|
conn.oneRttWritePendingVerificationPacketNumber.reset();
|
|
conn.oneRttWritePendingVerification = false;
|
|
} else {
|
|
return folly::makeUnexpected(QuicError(
|
|
TransportErrorCode::CRYPTO_ERROR,
|
|
"Packet with key update was acked in the wrong phase"));
|
|
}
|
|
}
|
|
return folly::unit;
|
|
}
|
|
|
|
// Unfortunate, we should make this more portable.
|
|
#if !defined IPV6_HOPLIMIT
|
|
#define IPV6_HOPLIMIT -1
|
|
#endif
|
|
#if !defined IP_TTL
|
|
#define IP_TTL -1
|
|
#endif
|
|
// Add a packet mark to the outstanding packet. Currently only supports
|
|
// TTLD marking.
|
|
void maybeAddPacketMark(
|
|
QuicConnectionStateBase& conn,
|
|
OutstandingPacketWrapper& op) {
|
|
static constexpr folly::SocketOptionKey kHopLimitOptionKey = {
|
|
IPPROTO_IPV6, IPV6_HOPLIMIT};
|
|
static constexpr folly::SocketOptionKey kTTLOptionKey = {IPPROTO_IP, IP_TTL};
|
|
if (!conn.socketCmsgsState.additionalCmsgs.has_value()) {
|
|
return;
|
|
}
|
|
const auto& cmsgs = conn.socketCmsgsState.additionalCmsgs;
|
|
auto it = cmsgs->find(kHopLimitOptionKey);
|
|
if (it != cmsgs->end() && it->second == 255) {
|
|
op.metadata.mark = OutstandingPacketMark::TTLD;
|
|
return;
|
|
}
|
|
it = cmsgs->find(kTTLOptionKey);
|
|
if (it != cmsgs->end() && it->second == 255) {
|
|
op.metadata.mark = OutstandingPacketMark::TTLD;
|
|
}
|
|
}
|
|
|
|
void maybeScheduleAckForCongestionFeedback(
|
|
const ReceivedUdpPacket& receivedPacket,
|
|
AckState& ackState) {
|
|
// If the packet was marked as having encountered congestion, send an ACK
|
|
// immediately to ensure timely response from the peer.
|
|
// Note that the tosValue will be populated only if the enableEcnOnEgress
|
|
// transport setting is enabled.
|
|
if ((receivedPacket.tosValue & 0b11) == 0b11) {
|
|
ackState.needsToSendAckImmediately = true;
|
|
}
|
|
}
|
|
|
|
void updateNegotiatedAckFeatures(QuicConnectionStateBase& conn) {
|
|
bool isAckReceiveTimestampsSupported =
|
|
conn.transportSettings.maybeAckReceiveTimestampsConfigSentToPeer &&
|
|
conn.maybePeerAckReceiveTimestampsConfig;
|
|
|
|
uint64_t peerRequestedTimestampsCount =
|
|
conn.maybePeerAckReceiveTimestampsConfig.has_value()
|
|
? conn.maybePeerAckReceiveTimestampsConfig.value()
|
|
.maxReceiveTimestampsPerAck
|
|
: 0;
|
|
|
|
conn.negotiatedAckReceiveTimestampSupport =
|
|
isAckReceiveTimestampsSupported && (peerRequestedTimestampsCount > 0);
|
|
|
|
conn.negotiatedExtendedAckFeatures = conn.peerAdvertisedExtendedAckFeatures &
|
|
conn.transportSettings.enableExtendedAckFeatures;
|
|
// Disable the ECN fields if we are not reading them
|
|
if (!conn.transportSettings.readEcnOnIngress) {
|
|
conn.negotiatedExtendedAckFeatures &=
|
|
~static_cast<ExtendedAckFeatureMaskType>(
|
|
ExtendedAckFeatureMask::ECN_COUNTS);
|
|
}
|
|
// Disable the receive timestamps fields if we have not regoatiated receive
|
|
// timestamps support
|
|
if (!conn.negotiatedAckReceiveTimestampSupport) {
|
|
conn.negotiatedExtendedAckFeatures &=
|
|
~static_cast<ExtendedAckFeatureMaskType>(
|
|
ExtendedAckFeatureMask::RECEIVE_TIMESTAMPS);
|
|
}
|
|
}
|
|
|
|
} // namespace quic
|