1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-07-30 14:43:05 +03:00

Remove exception throwing from the stream manager and flow control.

Summary: I started with the QuicStreamManager, but it turns out that the path from the manager up to the close path touches a LOT, and so this is a big diff. The strategy is basically the same everywhere, add a folly::Expected and check it on every function and enforce that with [[nodiscard]]

Reviewed By: kvtsoy

Differential Revision: D72347215

fbshipit-source-id: 452868b541754d2ecab646d6c3cbd6aacf317d7f
This commit is contained in:
Matt Joras
2025-04-07 23:45:33 -07:00
committed by Facebook GitHub Bot
parent 43af96e4f9
commit 67ce39cfdd
60 changed files with 5534 additions and 3893 deletions

View File

@ -116,7 +116,7 @@ uint64_t maybeUnvalidatedClientWritableBytes(
conn.udpSendPacketLen;
}
WriteQuicDataResult writeQuicDataToSocketImpl(
folly::Expected<WriteQuicDataResult, QuicError> writeQuicDataToSocketImpl(
QuicAsyncUDPSocket& sock,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -165,8 +165,11 @@ WriteQuicDataResult writeQuicDataToSocketImpl(
aead,
headerCipher,
version);
probesWritten = probeResult.probesWritten;
bytesWritten += probeResult.bytesWritten;
if (!probeResult.hasValue()) {
return folly::makeUnexpected(probeResult.error());
}
probesWritten = probeResult->probesWritten;
bytesWritten += probeResult->bytesWritten;
// We only get one chance to write out the probes.
numProbePackets = 0;
packetLimit =
@ -209,8 +212,11 @@ WriteQuicDataResult writeQuicDataToSocketImpl(
headerCipher,
version,
writeLoopBeginTime);
packetsWritten += connectionDataResult.packetsWritten;
bytesWritten += connectionDataResult.bytesWritten;
if (!connectionDataResult.hasValue()) {
return folly::makeUnexpected(connectionDataResult.error());
}
packetsWritten += connectionDataResult->packetsWritten;
bytesWritten += connectionDataResult->bytesWritten;
VLOG_IF(10, packetsWritten || probesWritten)
<< nodeToString(connection.nodeType) << " written data "
<< (exceptCryptoStream ? "without crypto data " : "")
@ -633,7 +639,7 @@ bool handleStreamBufMetaWritten(
return false;
}
void updateConnection(
folly::Expected<folly::Unit, QuicError> updateConnection(
QuicConnectionStateBase& conn,
Optional<ClonedPacketIdentifier> clonedPacketIdentifier,
RegularQuicWritePacket packet,
@ -664,8 +670,12 @@ void updateConnection(
case QuicWriteFrame::Type::WriteStreamFrame: {
const WriteStreamFrame& writeStreamFrame = *frame.asWriteStreamFrame();
retransmittable = true;
auto stream = CHECK_NOTNULL(
conn.streamManager->getStream(writeStreamFrame.streamId));
auto streamResult =
conn.streamManager->getStream(writeStreamFrame.streamId);
if (!streamResult) {
return folly::makeUnexpected(streamResult.error());
}
auto stream = streamResult.value();
bool newStreamDataWritten = false;
if (writeStreamFrame.fromBufMeta) {
newStreamDataWritten = handleStreamBufMetaWritten(
@ -687,7 +697,11 @@ void updateConnection(
packetNumberSpace);
}
if (newStreamDataWritten) {
updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len);
auto flowControlResult =
updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len);
if (flowControlResult.hasError()) {
return folly::makeUnexpected(flowControlResult.error());
}
maybeWriteBlockAfterSocketWrite(*stream);
maybeWriteDataBlockedAfterSocketWrite(conn);
conn.streamManager->addTx(writeStreamFrame.streamId);
@ -775,8 +789,12 @@ void updateConnection(
case QuicWriteFrame::Type::MaxStreamDataFrame: {
const MaxStreamDataFrame& maxStreamDataFrame =
*frame.asMaxStreamDataFrame();
auto stream = CHECK_NOTNULL(
conn.streamManager->getStream(maxStreamDataFrame.streamId));
auto streamResult =
conn.streamManager->getStream(maxStreamDataFrame.streamId);
if (streamResult.hasError()) {
return folly::makeUnexpected(streamResult.error());
}
auto stream = streamResult.value();
retransmittable = true;
VLOG(10) << nodeToString(conn.nodeType)
<< " sent packet with window update packetNum=" << packetNum
@ -873,7 +891,7 @@ void updateConnection(
if (!retransmittable && !isPing) {
DCHECK(!clonedPacketIdentifier);
return;
return folly::unit;
}
conn.lossState.totalAckElicitingPacketsSent++;
@ -962,6 +980,7 @@ void updateConnection(
} else {
++conn.outstandings.packetCount[packetNumberSpace];
}
return folly::unit;
}
uint64_t probePacketWritableBytes(QuicConnectionStateBase& conn) {
@ -1045,7 +1064,7 @@ HeaderBuilder ShortHeaderBuilder(ProtectionType keyPhase) {
};
}
WriteQuicDataResult writeCryptoAndAckDataToSocket(
folly::Expected<WriteQuicDataResult, QuicError> writeCryptoAndAckDataToSocket(
QuicAsyncUDPSocket& sock,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -1093,8 +1112,11 @@ WriteQuicDataResult writeCryptoAndAckDataToSocket(
headerCipher,
version,
token);
probesWritten += probeResult.probesWritten;
bytesWritten += probeResult.bytesWritten;
if (probeResult.hasError()) {
return folly::makeUnexpected(probeResult.error());
}
probesWritten += probeResult->probesWritten;
bytesWritten += probeResult->bytesWritten;
}
packetLimit = probesWritten > packetLimit ? 0 : (packetLimit - probesWritten);
// Only get one chance to write probes.
@ -1116,8 +1138,12 @@ WriteQuicDataResult writeCryptoAndAckDataToSocket(
Clock::now(),
token);
packetsWritten += writeResult.packetsWritten;
bytesWritten += writeResult.bytesWritten;
if (writeResult.hasError()) {
return folly::makeUnexpected(writeResult.error());
}
packetsWritten += writeResult->packetsWritten;
bytesWritten += writeResult->bytesWritten;
if (connection.transportSettings.immediatelyRetransmitInitialPackets &&
packetsWritten > 0 && packetsWritten < packetLimit) {
@ -1136,8 +1162,11 @@ WriteQuicDataResult writeCryptoAndAckDataToSocket(
headerCipher,
version,
token);
probesWritten += cloneResult.probesWritten;
bytesWritten += cloneResult.bytesWritten;
if (cloneResult.hasError()) {
return folly::makeUnexpected(cloneResult.error());
}
probesWritten += cloneResult->probesWritten;
bytesWritten += cloneResult->bytesWritten;
}
VLOG_IF(10, packetsWritten || probesWritten)
@ -1149,7 +1178,7 @@ WriteQuicDataResult writeCryptoAndAckDataToSocket(
return result;
}
WriteQuicDataResult writeQuicDataToSocket(
folly::Expected<WriteQuicDataResult, QuicError> writeQuicDataToSocket(
QuicAsyncUDPSocket& sock,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -1172,7 +1201,8 @@ WriteQuicDataResult writeQuicDataToSocket(
writeLoopBeginTime);
}
WriteQuicDataResult writeQuicDataExceptCryptoStreamToSocket(
folly::Expected<WriteQuicDataResult, QuicError>
writeQuicDataExceptCryptoStreamToSocket(
QuicAsyncUDPSocket& socket,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -1194,7 +1224,7 @@ WriteQuicDataResult writeQuicDataExceptCryptoStreamToSocket(
Clock::now());
}
uint64_t writeZeroRttDataToSocket(
folly::Expected<uint64_t, QuicError> writeZeroRttDataToSocket(
QuicAsyncUDPSocket& socket,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -1221,21 +1251,26 @@ uint64_t writeZeroRttDataToSocket(
.blockedFrames()
.simpleFrames())
.build();
auto written = writeConnectionDataToSocket(
socket,
connection,
srcConnId,
dstConnId,
std::move(builder),
LongHeader::typeToPacketNumberSpace(type),
scheduler,
congestionControlWritableBytes,
packetLimit,
aead,
headerCipher,
version,
Clock::now())
.packetsWritten;
auto writeResult = writeConnectionDataToSocket(
socket,
connection,
srcConnId,
dstConnId,
std::move(builder),
LongHeader::typeToPacketNumberSpace(type),
scheduler,
congestionControlWritableBytes,
packetLimit,
aead,
headerCipher,
version,
Clock::now());
if (writeResult.hasError()) {
return folly::makeUnexpected(writeResult.error());
}
auto written = writeResult->packetsWritten;
VLOG_IF(10, written > 0) << nodeToString(connection.nodeType)
<< " written zero rtt data, packets=" << written
<< " " << connection;
@ -1487,7 +1522,7 @@ void encryptPacketHeader(
* network, since currently there is no way to rewind scheduler and connection
* state after the packets have been written to a batch.
*/
WriteQuicDataResult writeConnectionDataToSocket(
folly::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
QuicAsyncUDPSocket& sock,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -1516,7 +1551,7 @@ WriteQuicDataResult writeConnectionDataToSocket(
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::EMPTY_SCHEDULER;
}
return {0, 0, 0};
return WriteQuicDataResult{0, 0, 0};
}
VLOG(10) << nodeToString(connection.nodeType)
@ -1567,7 +1602,7 @@ WriteQuicDataResult writeConnectionDataToSocket(
if (!flushSuccess) {
// Could not flush retried data. Return empty write result and wait for
// next retry.
return {0, 0, 0};
return WriteQuicDataResult{0, 0, 0};
}
}
@ -1631,7 +1666,7 @@ WriteQuicDataResult writeConnectionDataToSocket(
// make sure we flush the buffer in this function.
ioBufBatch.flush();
updateErrnoCount(connection, ioBufBatch);
return {ioBufBatch.getPktSent(), 0, bytesWritten};
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
}
// If we build a packet, we updateConnection(), even if write might have
// been failed. Because if it builds, a lot of states need to be updated no
@ -1645,7 +1680,7 @@ WriteQuicDataResult writeConnectionDataToSocket(
}
auto& result = ret.result;
updateConnection(
auto updateConnResult = updateConnection(
connection,
std::move(result->clonedPacketIdentifier),
std::move(result->packet->packet),
@ -1653,6 +1688,9 @@ WriteQuicDataResult writeConnectionDataToSocket(
folly::to<uint32_t>(ret.encodedSize),
folly::to<uint32_t>(ret.encodedBodySize),
false /* isDSRPacket */);
if (updateConnResult.hasError()) {
return folly::makeUnexpected(updateConnResult.error());
}
// if ioBufBatch.write returns false
// it is because a flush() call failed
@ -1661,7 +1699,7 @@ WriteQuicDataResult writeConnectionDataToSocket(
connection.writeDebugState.noWriteReason =
NoWriteReason::SOCKET_FAILURE;
}
return {ioBufBatch.getPktSent(), 0, bytesWritten};
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
}
if ((connection.transportSettings.batchingMode ==
@ -1687,10 +1725,10 @@ WriteQuicDataResult writeConnectionDataToSocket(
connection.bufAccessor->length() == 0 &&
connection.bufAccessor->headroom() == 0);
}
return {ioBufBatch.getPktSent(), 0, bytesWritten};
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
}
WriteQuicDataResult writeProbingDataToSocket(
folly::Expected<WriteQuicDataResult, QuicError> writeProbingDataToSocket(
QuicAsyncUDPSocket& sock,
QuicConnectionStateBase& connection,
const ConnectionId& srcConnId,
@ -1735,8 +1773,11 @@ WriteQuicDataResult writeProbingDataToSocket(
version,
writeLoopBeginTime,
token);
auto probesWritten = cloningResult.packetsWritten;
auto bytesWritten = cloningResult.bytesWritten;
if (cloningResult.hasError()) {
return folly::makeUnexpected(cloningResult.error());
}
auto probesWritten = cloningResult->packetsWritten;
auto bytesWritten = cloningResult->bytesWritten;
if (probesWritten < probesToSend) {
// If we can use an IMMEDIATE_ACK, that's better than a PING.
auto probeSchedulerBuilder = FrameScheduler::Builder(
@ -1768,13 +1809,16 @@ WriteQuicDataResult writeProbingDataToSocket(
headerCipher,
version,
writeLoopBeginTime);
probesWritten += probingResult.packetsWritten;
bytesWritten += probingResult.bytesWritten;
if (probingResult.hasError()) {
return folly::makeUnexpected(probingResult.error());
}
probesWritten += probingResult->packetsWritten;
bytesWritten += probingResult->bytesWritten;
}
VLOG_IF(10, probesWritten > 0)
<< nodeToString(connection.nodeType)
<< " writing probes using scheduler=CloningScheduler " << connection;
return {0, probesWritten, bytesWritten};
return WriteQuicDataResult{0, probesWritten, bytesWritten};
}
WriteDataReason shouldWriteData(/*const*/ QuicConnectionStateBase& conn) {
@ -1917,7 +1961,7 @@ void implicitAckCryptoStream(
implicitAck.largestAcked = ackBlocks.back().end;
implicitAck.ackBlocks.emplace_back(
ackBlocks.front().start, implicitAck.largestAcked);
processAckFrame(
auto result = processAckFrame(
conn,
packetNumSpace,
implicitAck,
@ -1943,13 +1987,17 @@ void implicitAckCryptoStream(
// our outstanding packets.
}
}
return folly::unit;
},
// We shouldn't mark anything as lost from the implicit ACK, as it should
// be ACKing the entire rangee.
[](auto&, auto&, auto) {
LOG(FATAL) << "Got loss from implicit crypto ACK.";
return folly::unit;
},
implicitAckTime);
// TODO handle error
CHECK(result.hasValue());
// Clear our the loss buffer explicitly. The implicit ACK itself will not
// remove data already in the loss buffer.
auto cryptoStream = getCryptoStream(*conn.cryptoState, encryptionLevel);