mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-07-30 14:43:05 +03:00
Remove remaining exception throw from QuicPacketScheduler
Summary: Continuing the theme, mostly involved getting rid of it from the stream writes and connecting two sides of Expecteds. Reviewed By: kvtsoy Differential Revision: D74585096 fbshipit-source-id: d81e6cbbc09981d0f1519494cf87e93e93e892db
This commit is contained in:
committed by
Facebook GitHub Bot
parent
40e6c833fa
commit
5bf490e4b8
@ -319,7 +319,10 @@ FrameScheduler::scheduleFramesForPacket(
|
||||
pingFrameScheduler_->writePing(wrapper);
|
||||
}
|
||||
if (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) {
|
||||
streamFrameScheduler_->writeStreams(wrapper);
|
||||
auto result = streamFrameScheduler_->writeStreams(wrapper);
|
||||
if (result.hasError()) {
|
||||
return folly::makeUnexpected(result.error());
|
||||
}
|
||||
}
|
||||
if (datagramFrameScheduler_ &&
|
||||
datagramFrameScheduler_->hasPendingDatagramFrames()) {
|
||||
@ -386,7 +389,7 @@ folly::StringPiece FrameScheduler::name() const {
|
||||
return name_;
|
||||
}
|
||||
|
||||
bool StreamFrameScheduler::writeStreamLossBuffers(
|
||||
folly::Expected<bool, QuicError> StreamFrameScheduler::writeStreamLossBuffers(
|
||||
PacketBuilderInterface& builder,
|
||||
QuicStreamState& stream) {
|
||||
bool wroteStreamFrame = false;
|
||||
@ -404,8 +407,7 @@ bool StreamFrameScheduler::writeStreamLossBuffers(
|
||||
std::nullopt /* skipLenHint */,
|
||||
stream.groupId);
|
||||
if (res.hasError()) {
|
||||
throw QuicInternalException(
|
||||
res.error().message, *res.error().code.asLocalErrorCode());
|
||||
return folly::makeUnexpected(res.error());
|
||||
}
|
||||
auto dataLen = *res;
|
||||
if (dataLen) {
|
||||
@ -426,19 +428,28 @@ bool StreamFrameScheduler::writeStreamLossBuffers(
|
||||
StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
|
||||
: conn_(conn) {}
|
||||
|
||||
StreamFrameScheduler::StreamWriteResult StreamFrameScheduler::writeSingleStream(
|
||||
folly::Expected<StreamFrameScheduler::StreamWriteResult, QuicError>
|
||||
StreamFrameScheduler::writeSingleStream(
|
||||
PacketBuilderInterface& builder,
|
||||
QuicStreamState& stream,
|
||||
uint64_t& connWritableBytes) {
|
||||
StreamWriteResult result = StreamWriteResult::NOT_LIMITED;
|
||||
if (!stream.lossBuffer.empty()) {
|
||||
if (!writeStreamLossBuffers(builder, stream)) {
|
||||
auto writeResult = writeStreamLossBuffers(builder, stream);
|
||||
if (writeResult.hasError()) {
|
||||
return folly::makeUnexpected(writeResult.error());
|
||||
}
|
||||
if (!writeResult.value()) {
|
||||
return StreamWriteResult::PACKET_FULL;
|
||||
}
|
||||
}
|
||||
if (stream.hasWritableData(true)) {
|
||||
if (connWritableBytes > 0 || stream.hasWritableData(false)) {
|
||||
if (!writeStreamFrame(builder, stream, connWritableBytes)) {
|
||||
auto writeResult = writeStreamFrame(builder, stream, connWritableBytes);
|
||||
if (writeResult.hasError()) {
|
||||
return folly::makeUnexpected(writeResult.error());
|
||||
}
|
||||
if (!writeResult.value()) {
|
||||
return StreamWriteResult::PACKET_FULL;
|
||||
}
|
||||
result = (connWritableBytes == 0) ? StreamWriteResult::CONN_FC_LIMITED
|
||||
@ -450,7 +461,7 @@ StreamFrameScheduler::StreamWriteResult StreamFrameScheduler::writeSingleStream(
|
||||
return result;
|
||||
}
|
||||
|
||||
StreamId StreamFrameScheduler::writeStreamsHelper(
|
||||
folly::Expected<StreamId, QuicError> StreamFrameScheduler::writeStreamsHelper(
|
||||
PacketBuilderInterface& builder,
|
||||
const std::set<StreamId>& writableStreams,
|
||||
StreamId nextScheduledStream,
|
||||
@ -466,7 +477,10 @@ StreamId StreamFrameScheduler::writeStreamsHelper(
|
||||
auto stream = conn_.streamManager->findStream(*writableStreamItr);
|
||||
CHECK(stream);
|
||||
auto writeResult = writeSingleStream(builder, *stream, connWritableBytes);
|
||||
if (writeResult == StreamWriteResult::PACKET_FULL) {
|
||||
if (writeResult.hasError()) {
|
||||
return folly::makeUnexpected(writeResult.error());
|
||||
}
|
||||
if (writeResult.value() == StreamWriteResult::PACKET_FULL) {
|
||||
break;
|
||||
}
|
||||
writableStreamItr++;
|
||||
@ -477,7 +491,8 @@ StreamId StreamFrameScheduler::writeStreamsHelper(
|
||||
return *writableStreamItr;
|
||||
}
|
||||
|
||||
void StreamFrameScheduler::writeStreamsHelper(
|
||||
folly::Expected<folly::Unit, QuicError>
|
||||
StreamFrameScheduler::writeStreamsHelper(
|
||||
PacketBuilderInterface& builder,
|
||||
deprecated::PriorityQueue& writableStreams,
|
||||
uint64_t& connWritableBytes,
|
||||
@ -498,12 +513,15 @@ void StreamFrameScheduler::writeStreamsHelper(
|
||||
auto stream = CHECK_NOTNULL(conn_.streamManager->findStream(streamId));
|
||||
if (!stream->hasSchedulableData() && stream->hasSchedulableDsr()) {
|
||||
// We hit a DSR stream
|
||||
return;
|
||||
return folly::unit;
|
||||
}
|
||||
CHECK(stream) << "streamId=" << streamId
|
||||
<< "inc=" << uint64_t(level.incremental);
|
||||
if (writeSingleStream(builder, *stream, connWritableBytes) ==
|
||||
StreamWriteResult::PACKET_FULL) {
|
||||
auto writeResult = writeSingleStream(builder, *stream, connWritableBytes);
|
||||
if (writeResult.hasError()) {
|
||||
return folly::makeUnexpected(writeResult.error());
|
||||
}
|
||||
if (writeResult.value() == StreamWriteResult::PACKET_FULL) {
|
||||
break;
|
||||
}
|
||||
auto remainingSpaceAfter = builder.remainingSpaceInPkt();
|
||||
@ -513,13 +531,15 @@ void StreamFrameScheduler::writeStreamsHelper(
|
||||
bool forceNext = remainingSpaceAfter > 0;
|
||||
level.iterator->next(forceNext);
|
||||
if (streamPerPacket) {
|
||||
return;
|
||||
return folly::unit;
|
||||
}
|
||||
} while (!level.iterator->end());
|
||||
}
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
void StreamFrameScheduler::writeStreamsHelper(
|
||||
folly::Expected<folly::Unit, QuicError>
|
||||
StreamFrameScheduler::writeStreamsHelper(
|
||||
PacketBuilderInterface& builder,
|
||||
PriorityQueue& writableStreams,
|
||||
uint64_t& connWritableBytes,
|
||||
@ -538,13 +558,16 @@ void StreamFrameScheduler::writeStreamsHelper(
|
||||
auto stream = CHECK_NOTNULL(conn_.streamManager->findStream(streamId));
|
||||
if (!stream->hasSchedulableData() && stream->hasSchedulableDsr()) {
|
||||
// We hit a DSR stream
|
||||
return;
|
||||
return folly::unit;
|
||||
}
|
||||
CHECK(stream) << "streamId=" << streamId;
|
||||
// TODO: this is counting STREAM frame overhead against the stream itself
|
||||
auto lastWriteBytes = builder.remainingSpaceInPkt();
|
||||
auto writeResult = writeSingleStream(builder, *stream, connWritableBytes);
|
||||
if (writeResult == StreamWriteResult::PACKET_FULL) {
|
||||
if (writeResult.hasError()) {
|
||||
return folly::makeUnexpected(writeResult.error());
|
||||
}
|
||||
if (writeResult.value() == StreamWriteResult::PACKET_FULL) {
|
||||
break;
|
||||
}
|
||||
auto remainingSpaceAfter = builder.remainingSpaceInPkt();
|
||||
@ -554,7 +577,7 @@ void StreamFrameScheduler::writeStreamsHelper(
|
||||
// we should erase the stream from writableStreams, the caller can rollback
|
||||
// the transaction if the packet write fails
|
||||
if (remainingSpaceAfter > 0) {
|
||||
if (writeResult == StreamWriteResult::CONN_FC_LIMITED) {
|
||||
if (writeResult.value() == StreamWriteResult::CONN_FC_LIMITED) {
|
||||
conn_.streamManager->addConnFCBlockedStream(streamId);
|
||||
}
|
||||
writableStreams.erase(id);
|
||||
@ -562,44 +585,56 @@ void StreamFrameScheduler::writeStreamsHelper(
|
||||
writableStreams.consume(lastWriteBytes);
|
||||
}
|
||||
if (streamPerPacket) {
|
||||
return;
|
||||
return folly::unit;
|
||||
}
|
||||
}
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
|
||||
folly::Expected<folly::Unit, QuicError> StreamFrameScheduler::writeStreams(
|
||||
PacketBuilderInterface& builder) {
|
||||
DCHECK(conn_.streamManager->hasWritable());
|
||||
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
|
||||
// Write the control streams first as a naive binary priority mechanism.
|
||||
const auto& controlWriteQueue = conn_.streamManager->controlWriteQueue();
|
||||
if (!controlWriteQueue.empty()) {
|
||||
conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper(
|
||||
auto result = writeStreamsHelper(
|
||||
builder,
|
||||
controlWriteQueue,
|
||||
conn_.schedulingState.nextScheduledControlStream,
|
||||
connWritableBytes,
|
||||
conn_.transportSettings.streamFramePerPacket);
|
||||
if (result.hasError()) {
|
||||
return folly::makeUnexpected(result.error());
|
||||
}
|
||||
conn_.schedulingState.nextScheduledControlStream = result.value();
|
||||
}
|
||||
auto* oldWriteQueue = conn_.streamManager->oldWriteQueue();
|
||||
QuicStreamState* nextStream{nullptr};
|
||||
if (oldWriteQueue) {
|
||||
if (!oldWriteQueue->empty()) {
|
||||
writeStreamsHelper(
|
||||
auto result = writeStreamsHelper(
|
||||
builder,
|
||||
*oldWriteQueue,
|
||||
connWritableBytes,
|
||||
conn_.transportSettings.streamFramePerPacket);
|
||||
if (result.hasError()) {
|
||||
return folly::makeUnexpected(result.error());
|
||||
}
|
||||
auto streamId = oldWriteQueue->getNextScheduledStream();
|
||||
nextStream = conn_.streamManager->findStream(streamId);
|
||||
}
|
||||
} else {
|
||||
auto& writeQueue = conn_.streamManager->writeQueue();
|
||||
if (!writeQueue.empty()) {
|
||||
writeStreamsHelper(
|
||||
auto result = writeStreamsHelper(
|
||||
builder,
|
||||
writeQueue,
|
||||
connWritableBytes,
|
||||
conn_.transportSettings.streamFramePerPacket);
|
||||
if (result.hasError()) {
|
||||
return folly::makeUnexpected(result.error());
|
||||
}
|
||||
if (!writeQueue.empty()) {
|
||||
auto id = writeQueue.peekNextScheduledID();
|
||||
CHECK(id.isStreamID());
|
||||
@ -615,6 +650,7 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
|
||||
if (nextStream && !nextStream->hasSchedulableData()) {
|
||||
nextStreamDsr_ = true;
|
||||
}
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
bool StreamFrameScheduler::hasPendingData() const {
|
||||
@ -624,7 +660,7 @@ bool StreamFrameScheduler::hasPendingData() const {
|
||||
getSendConnFlowControlBytesWire(conn_) > 0));
|
||||
}
|
||||
|
||||
bool StreamFrameScheduler::writeStreamFrame(
|
||||
folly::Expected<bool, QuicError> StreamFrameScheduler::writeStreamFrame(
|
||||
PacketBuilderInterface& builder,
|
||||
QuicStreamState& stream,
|
||||
uint64_t& connWritableBytes) {
|
||||
@ -653,8 +689,7 @@ bool StreamFrameScheduler::writeStreamFrame(
|
||||
std::nullopt /* skipLenHint */,
|
||||
stream.groupId);
|
||||
if (res.hasError()) {
|
||||
throw QuicInternalException(
|
||||
res.error().message, *res.error().code.asLocalErrorCode());
|
||||
return folly::makeUnexpected(res.error());
|
||||
}
|
||||
auto dataLen = *res;
|
||||
if (!dataLen) {
|
||||
|
Reference in New Issue
Block a user