1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-07-30 14:43:05 +03:00

Use new priority queue implementation

Summary:
This adds the new priority queue implementation and a TransportSetting that controls whether it should be used or not.  The default is still the old priority queue, so this diff should not introduce any functional changes in production code.

One key difference is that with the new queue, streams with new data that become connection flow control blocked are *removed* from the queue, and added back once more flow control comes.  I think this will make the scheduler slightly more efficient at writing low-priority loss streams when there's high-pri data and no connection flow control, since it doesn't need to skip over those streams when building the packet.

If this diff regresses build size, D72476484 should get it back.

Reviewed By: mjoras

Differential Revision: D72476486

fbshipit-source-id: 9665cf3f66dcdbfd57d2199d5c832529a68cfac0
This commit is contained in:
Alan Frindell
2025-04-17 08:43:19 -07:00
committed by Facebook GitHub Bot
parent 47da181ae8
commit 1e1c7defef
15 changed files with 696 additions and 177 deletions

View File

@ -426,21 +426,28 @@ bool StreamFrameScheduler::writeStreamLossBuffers(
StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn) StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
: conn_(conn) {} : conn_(conn) {}
bool StreamFrameScheduler::writeSingleStream( StreamFrameScheduler::StreamWriteResult StreamFrameScheduler::writeSingleStream(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
QuicStreamState& stream, QuicStreamState& stream,
uint64_t& connWritableBytes) { uint64_t& connWritableBytes) {
StreamWriteResult result = StreamWriteResult::NOT_LIMITED;
if (!stream.lossBuffer.empty()) { if (!stream.lossBuffer.empty()) {
if (!writeStreamLossBuffers(builder, stream)) { if (!writeStreamLossBuffers(builder, stream)) {
return false; return StreamWriteResult::PACKET_FULL;
} }
} }
if (stream.hasWritableData() && connWritableBytes > 0) { if (stream.hasWritableData(true)) {
if (!writeStreamFrame(builder, stream, connWritableBytes)) { if (connWritableBytes > 0 || stream.hasWritableData(false)) {
return false; if (!writeStreamFrame(builder, stream, connWritableBytes)) {
return StreamWriteResult::PACKET_FULL;
}
result = (connWritableBytes == 0) ? StreamWriteResult::CONN_FC_LIMITED
: StreamWriteResult::NOT_LIMITED;
} else {
result = StreamWriteResult::CONN_FC_LIMITED;
} }
} }
return true; return result;
} }
StreamId StreamFrameScheduler::writeStreamsHelper( StreamId StreamFrameScheduler::writeStreamsHelper(
@ -458,7 +465,8 @@ StreamId StreamFrameScheduler::writeStreamsHelper(
while (writableStreamItr != wrapper.cend()) { while (writableStreamItr != wrapper.cend()) {
auto stream = conn_.streamManager->findStream(*writableStreamItr); auto stream = conn_.streamManager->findStream(*writableStreamItr);
CHECK(stream); CHECK(stream);
if (!writeSingleStream(builder, *stream, connWritableBytes)) { auto writeResult = writeSingleStream(builder, *stream, connWritableBytes);
if (writeResult == StreamWriteResult::PACKET_FULL) {
break; break;
} }
writableStreamItr++; writableStreamItr++;
@ -494,7 +502,8 @@ void StreamFrameScheduler::writeStreamsHelper(
} }
CHECK(stream) << "streamId=" << streamId CHECK(stream) << "streamId=" << streamId
<< "inc=" << uint64_t(level.incremental); << "inc=" << uint64_t(level.incremental);
if (!writeSingleStream(builder, *stream, connWritableBytes)) { if (writeSingleStream(builder, *stream, connWritableBytes) ==
StreamWriteResult::PACKET_FULL) {
break; break;
} }
auto remainingSpaceAfter = builder.remainingSpaceInPkt(); auto remainingSpaceAfter = builder.remainingSpaceInPkt();
@ -510,6 +519,54 @@ void StreamFrameScheduler::writeStreamsHelper(
} }
} }
void StreamFrameScheduler::writeStreamsHelper(
PacketBuilderInterface& builder,
PriorityQueue& writableStreams,
uint64_t& connWritableBytes,
bool streamPerPacket) {
// Fill a packet with non-control stream data, in priority order
//
// The streams can have loss data or fresh data. Once we run out of
// conn flow control, we can only write loss data. In order to
// advance the write queue, we have to remove the elements. Store
// them in QuicStreamManager and re-insert when more f/c arrives
while (!writableStreams.empty() && builder.remainingSpaceInPkt() > 0) {
auto id = writableStreams.peekNextScheduledID();
// we only support streams here for now
CHECK(id.isStreamID());
auto streamId = id.asStreamID();
auto stream = CHECK_NOTNULL(conn_.streamManager->findStream(streamId));
if (!stream->hasSchedulableData() && stream->hasSchedulableDsr()) {
// We hit a DSR stream
return;
}
CHECK(stream) << "streamId=" << streamId;
// TODO: this is counting STREAM frame overhead against the stream itself
auto lastWriteBytes = builder.remainingSpaceInPkt();
auto writeResult = writeSingleStream(builder, *stream, connWritableBytes);
if (writeResult == StreamWriteResult::PACKET_FULL) {
break;
}
auto remainingSpaceAfter = builder.remainingSpaceInPkt();
lastWriteBytes -= remainingSpaceAfter;
// If we wrote a stream frame and there's still space in the packet,
// that implies we ran out of data or flow control on the stream and
// we should erase the stream from writableStreams, the caller can rollback
// the transaction if the packet write fails
if (remainingSpaceAfter > 0) {
if (writeResult == StreamWriteResult::CONN_FC_LIMITED) {
conn_.streamManager->addConnFCBlockedStream(streamId);
}
writableStreams.erase(id);
} else { // the loop will break
writableStreams.consume(lastWriteBytes);
}
if (streamPerPacket) {
return;
}
}
}
void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
DCHECK(conn_.streamManager->hasWritable()); DCHECK(conn_.streamManager->hasWritable());
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
@ -523,23 +580,40 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
connWritableBytes, connWritableBytes,
conn_.transportSettings.streamFramePerPacket); conn_.transportSettings.streamFramePerPacket);
} }
auto& writeQueue = conn_.streamManager->writeQueue(); auto* oldWriteQueue = conn_.streamManager->oldWriteQueue();
if (!writeQueue.empty()) { QuicStreamState* nextStream{nullptr};
writeStreamsHelper( if (oldWriteQueue) {
builder, if (!oldWriteQueue->empty()) {
writeQueue, writeStreamsHelper(
connWritableBytes, builder,
conn_.transportSettings.streamFramePerPacket); *oldWriteQueue,
// If the next non-control stream is DSR, record that fact in the scheduler connWritableBytes,
// so that we don't try to write a non DSR stream again. Note that this conn_.transportSettings.streamFramePerPacket);
// means that in the presence of many large control streams and DSR auto streamId = oldWriteQueue->getNextScheduledStream();
// streams, we won't completely prioritize control streams but they nextStream = conn_.streamManager->findStream(streamId);
// will not be starved.
auto streamId = writeQueue.getNextScheduledStream();
auto stream = conn_.streamManager->findStream(streamId);
if (stream && !stream->hasSchedulableData()) {
nextStreamDsr_ = true;
} }
} else {
auto& writeQueue = conn_.streamManager->writeQueue();
if (!writeQueue.empty()) {
writeStreamsHelper(
builder,
writeQueue,
connWritableBytes,
conn_.transportSettings.streamFramePerPacket);
if (!writeQueue.empty()) {
auto id = writeQueue.peekNextScheduledID();
CHECK(id.isStreamID());
nextStream = conn_.streamManager->findStream(id.asStreamID());
}
}
}
// If the next non-control stream is DSR, record that fact in the
// scheduler so that we don't try to write a non DSR stream again.
// Note that this means that in the presence of many large control
// streams and DSR streams, we won't completely prioritize control
// streams but they will not be starved.
if (nextStream && !nextStream->hasSchedulableData()) {
nextStreamDsr_ = true;
} }
} }

View File

@ -88,14 +88,19 @@ class StreamFrameScheduler {
QuicStreamState& stream); QuicStreamState& stream);
/** /**
* Write a single stream's write buffer or loss buffer * Writes a single stream's write buffer or loss buffer.
* *
* lossOnly: if only loss buffer should be written. This param may get mutated * @param builder: The packet builder used to construct the packet.
* inside the function. * @param stream: The state of the QUIC stream being written.
* @param connWritableBytes: The number of writable bytes available in the
* connection. It can be 0 and still write loss data
* or stream FIN. Mutated by this function
* *
* Return: true if write should continue after this stream, false otherwise. * Return: StreamWriteResult indicating whether the packet is full, connection
* flow control limited, or not limited by connection flow control.
*/ */
bool writeSingleStream( enum class StreamWriteResult { PACKET_FULL, NOT_LIMITED, CONN_FC_LIMITED };
StreamWriteResult writeSingleStream(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
QuicStreamState& stream, QuicStreamState& stream,
uint64_t& connWritableBytes); uint64_t& connWritableBytes);
@ -113,6 +118,12 @@ class StreamFrameScheduler {
uint64_t& connWritableBytes, uint64_t& connWritableBytes,
bool streamPerPacket); bool streamPerPacket);
void writeStreamsHelper(
PacketBuilderInterface& builder,
PriorityQueue& writableStreams,
uint64_t& connWritableBytes,
bool streamPerPacket);
/** /**
* Helper function to write either stream data if stream is not flow * Helper function to write either stream data if stream is not flow
* controlled or a blocked frame otherwise. * controlled or a blocked frame otherwise.

View File

@ -831,7 +831,11 @@ QuicTransportBaseLite::setStreamPriority(
} }
// It's not an error to prioritize a stream after it's sent its FIN - this // It's not an error to prioritize a stream after it's sent its FIN - this
// can reprioritize retransmissions. // can reprioritize retransmissions.
conn_->streamManager->setStreamPriority(id, priority, conn_->qLogger); conn_->streamManager->setStreamPriority(
id,
priority,
getSendConnFlowControlBytesWire(*conn_) > 0,
conn_->qLogger);
return folly::unit; return folly::unit;
} }

View File

@ -719,7 +719,10 @@ folly::Expected<folly::Unit, QuicError> updateConnection(
conn.streamManager->addTx(writeStreamFrame.streamId); conn.streamManager->addTx(writeStreamFrame.streamId);
newStreamBytesSent += writeStreamFrame.len; newStreamBytesSent += writeStreamFrame.len;
} }
conn.streamManager->updateWritableStreams(*stream); // This call could take an argument whether the packet scheduler already
// removed stream from writeQueue
conn.streamManager->updateWritableStreams(
*stream, getSendConnFlowControlBytesWire(conn) > 0);
streamBytesSent += writeStreamFrame.len; streamBytesSent += writeStreamFrame.len;
detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten); detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten);
break; break;
@ -1693,6 +1696,12 @@ folly::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
writableBytes -= cipherOverhead; writableBytes -= cipherOverhead;
} }
auto writeQueueTransaction =
connection.streamManager->writeQueue().beginTransaction();
auto guard = folly::makeGuard([&] {
connection.streamManager->writeQueue().rollbackTransaction(
std::move(writeQueueTransaction));
});
const auto& dataPlaneFunc = const auto& dataPlaneFunc =
connection.transportSettings.dataPathType == DataPathType::ChainedMemory connection.transportSettings.dataPathType == DataPathType::ChainedMemory
? iobufChainBasedBuildScheduleEncrypt ? iobufChainBasedBuildScheduleEncrypt
@ -1732,6 +1741,10 @@ folly::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
} }
auto& result = ret->result; auto& result = ret->result;
// This call to updateConnection will attempt to erase streams from the
// write queue that have already been removed in QuicPacketScheduler.
// Removing non-existent streams can be O(N), consider passing the
// transaction set to skip this step
auto updateConnResult = updateConnection( auto updateConnResult = updateConnection(
connection, connection,
std::move(result->clonedPacketIdentifier), std::move(result->clonedPacketIdentifier),
@ -1743,6 +1756,9 @@ folly::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
if (updateConnResult.hasError()) { if (updateConnResult.hasError()) {
return folly::makeUnexpected(updateConnResult.error()); return folly::makeUnexpected(updateConnResult.error());
} }
guard.dismiss();
connection.streamManager->writeQueue().commitTransaction(
std::move(writeQueueTransaction));
// if ioBufBatch.write returns false // if ioBufBatch.write returns false
// it is because a flush() call failed // it is because a flush() call failed

View File

@ -106,19 +106,6 @@ PacketNum addOutstandingPacket(QuicConnectionStateBase& conn) {
using namespace quic::test; using namespace quic::test;
std::unique_ptr<QuicClientConnectionState>
createConn(uint32_t maxStreams, uint64_t maxOffset, uint64_t initialMaxOffset) {
auto conn = std::make_unique<QuicClientConnectionState>(
FizzClientQuicHandshakeContext::Builder().build());
auto result =
conn->streamManager->setMaxLocalBidirectionalStreams(maxStreams);
CHECK(!result.hasError()) << "Failed to set max local bidirectional streams";
conn->flowControlState.peerAdvertisedMaxOffset = maxOffset;
conn->flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote =
initialMaxOffset;
return conn;
}
auto createStream( auto createStream(
QuicClientConnectionState& conn, QuicClientConnectionState& conn,
std::optional<HTTPPriorityQueue::Priority> priority = std::nullopt) { std::optional<HTTPPriorityQueue::Priority> priority = std::nullopt) {
@ -236,13 +223,40 @@ namespace quic::test {
class QuicPacketSchedulerTestBase { class QuicPacketSchedulerTestBase {
public: public:
QuicVersion version{QuicVersion::MVFST}; QuicVersion version{QuicVersion::MVFST};
std::unique_ptr<QuicClientConnectionState> createConn(
uint32_t maxStreams,
uint64_t maxOffset,
uint64_t initialMaxOffset,
bool useNewPriorityQueue = false) {
auto conn = std::make_unique<QuicClientConnectionState>(
FizzClientQuicHandshakeContext::Builder().build());
transportSettings.useNewPriorityQueue = useNewPriorityQueue;
auto result =
conn->streamManager->refreshTransportSettings(transportSettings);
CHECK(!result.hasError()) << "Failed to refresh transport settings";
result = conn->streamManager->setMaxLocalBidirectionalStreams(maxStreams);
CHECK(!result.hasError())
<< "Failed to set max local bidirectional streams";
conn->flowControlState.peerAdvertisedMaxOffset = maxOffset;
conn->flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote =
initialMaxOffset;
return conn;
}
TransportSettings transportSettings;
}; };
class QuicPacketSchedulerTest : public QuicPacketSchedulerTestBase, class QuicPacketSchedulerTest : public QuicPacketSchedulerTestBase,
public testing::Test { public testing::TestWithParam<bool> {
public: public:
StreamId nextScheduledStreamID(QuicConnectionStateBase& conn) { StreamId nextScheduledStreamID(QuicConnectionStateBase& conn) {
return conn.streamManager->writeQueue().getNextScheduledStream(); auto oldWriteQueue = conn.streamManager->oldWriteQueue();
CHECK(oldWriteQueue || GetParam()) << "why old queue when using new";
if (oldWriteQueue) {
return oldWriteQueue->getNextScheduledStream();
}
return conn.streamManager->writeQueue().peekNextScheduledID().asStreamID();
} }
}; };
@ -280,7 +294,7 @@ TEST_F(QuicPacketSchedulerTest, CryptoPaddingInitialPacket) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, PaddingInitialPureAcks) { TEST_P(QuicPacketSchedulerTest, PaddingInitialPureAcks) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -314,7 +328,7 @@ TEST_F(QuicPacketSchedulerTest, PaddingInitialPureAcks) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, InitialPaddingDoesNotUseWrapper) { TEST_P(QuicPacketSchedulerTest, InitialPaddingDoesNotUseWrapper) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -349,7 +363,7 @@ TEST_F(QuicPacketSchedulerTest, InitialPaddingDoesNotUseWrapper) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, CryptoServerInitialPadded) { TEST_P(QuicPacketSchedulerTest, CryptoServerInitialPadded) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -383,7 +397,7 @@ TEST_F(QuicPacketSchedulerTest, CryptoServerInitialPadded) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, PadTwoInitialPackets) { TEST_P(QuicPacketSchedulerTest, PadTwoInitialPackets) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -437,7 +451,7 @@ TEST_F(QuicPacketSchedulerTest, PadTwoInitialPackets) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, CryptoPaddingRetransmissionClientInitial) { TEST_P(QuicPacketSchedulerTest, CryptoPaddingRetransmissionClientInitial) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -472,7 +486,7 @@ TEST_F(QuicPacketSchedulerTest, CryptoPaddingRetransmissionClientInitial) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, CryptoSchedulerOnlySingleLossFits) { TEST_P(QuicPacketSchedulerTest, CryptoSchedulerOnlySingleLossFits) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -502,7 +516,7 @@ TEST_F(QuicPacketSchedulerTest, CryptoSchedulerOnlySingleLossFits) {
EXPECT_TRUE(scheduler.writeCryptoData(builderWrapper)); EXPECT_TRUE(scheduler.writeCryptoData(builderWrapper));
} }
TEST_F(QuicPacketSchedulerTest, CryptoWritePartialLossBuffer) { TEST_P(QuicPacketSchedulerTest, CryptoWritePartialLossBuffer) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -540,7 +554,7 @@ TEST_F(QuicPacketSchedulerTest, CryptoWritePartialLossBuffer) {
EXPECT_FALSE(conn.cryptoState->initialStream.lossBuffer.empty()); EXPECT_FALSE(conn.cryptoState->initialStream.lossBuffer.empty());
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerExists) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerExists) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -564,7 +578,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerExists) {
EXPECT_LT(builder.remainingSpaceInPkt(), originalSpace); EXPECT_LT(builder.remainingSpaceInPkt(), originalSpace);
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameNoSpace) { TEST_P(QuicPacketSchedulerTest, StreamFrameNoSpace) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -589,7 +603,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameNoSpace) {
EXPECT_EQ(builder.remainingSpaceInPkt(), originalSpace); EXPECT_EQ(builder.remainingSpaceInPkt(), originalSpace);
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerStreamNotExists) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerStreamNotExists) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
auto connId = getTestConnectionId(); auto connId = getTestConnectionId();
@ -611,7 +625,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerStreamNotExists) {
EXPECT_EQ(builder.remainingSpaceInPkt(), originalSpace); EXPECT_EQ(builder.remainingSpaceInPkt(), originalSpace);
} }
TEST_F(QuicPacketSchedulerTest, NoCloningForDSR) { TEST_P(QuicPacketSchedulerTest, NoCloningForDSR) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -638,7 +652,7 @@ TEST_F(QuicPacketSchedulerTest, NoCloningForDSR) {
EXPECT_FALSE(result->packet.hasValue()); EXPECT_FALSE(result->packet.hasValue());
} }
TEST_F(QuicPacketSchedulerTest, CloningSchedulerTest) { TEST_P(QuicPacketSchedulerTest, CloningSchedulerTest) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -668,7 +682,7 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerTest) {
EXPECT_EQ(packetNum, result->clonedPacketIdentifier->packetNumber); EXPECT_EQ(packetNum, result->clonedPacketIdentifier->packetNumber);
} }
TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) { TEST_P(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -744,7 +758,7 @@ TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) {
} }
} }
TEST_F(QuicPacketSchedulerTest, DoNotCloneProcessedClonedPacket) { TEST_P(QuicPacketSchedulerTest, DoNotCloneProcessedClonedPacket) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -780,8 +794,8 @@ TEST_F(QuicPacketSchedulerTest, DoNotCloneProcessedClonedPacket) {
} }
class CloneAllPacketsWithCryptoFrameTest class CloneAllPacketsWithCryptoFrameTest
: public QuicPacketSchedulerTest, : public QuicPacketSchedulerTestBase,
public WithParamInterface<std::tuple<bool, bool>> {}; public TestWithParam<std::tuple<bool, bool>> {};
TEST_P( TEST_P(
CloneAllPacketsWithCryptoFrameTest, CloneAllPacketsWithCryptoFrameTest,
@ -878,7 +892,7 @@ INSTANTIATE_TEST_SUITE_P(
CloneAllPacketsWithCryptoFrameTest, CloneAllPacketsWithCryptoFrameTest,
Combine(Bool(), Bool())); Combine(Bool(), Bool()));
TEST_F(QuicPacketSchedulerTest, DoNotSkipUnclonedCryptoPacket) { TEST_P(QuicPacketSchedulerTest, DoNotSkipUnclonedCryptoPacket) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.transportSettings.cloneAllPacketsWithCryptoFrame = true; conn.transportSettings.cloneAllPacketsWithCryptoFrame = true;
@ -918,7 +932,7 @@ TEST_F(QuicPacketSchedulerTest, DoNotSkipUnclonedCryptoPacket) {
EXPECT_EQ(firstPacketNum, result->clonedPacketIdentifier->packetNumber); EXPECT_EQ(firstPacketNum, result->clonedPacketIdentifier->packetNumber);
} }
TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeData) { TEST_P(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeData) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -941,7 +955,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeData) {
There was a bug that would result in mvfst emit a "empty" PTO packet with There was a bug that would result in mvfst emit a "empty" PTO packet with
acks; this is the test case to cover that scenario. acks; this is the test case to cover that scenario.
*/ */
TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeDataAndAcks) { TEST_P(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeDataAndAcks) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
conn.version = QuicVersion::MVFST_EXPERIMENTAL2; conn.version = QuicVersion::MVFST_EXPERIMENTAL2;
@ -1019,7 +1033,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeDataAndAcks) {
EXPECT_TRUE(hasCryptoFrame); EXPECT_TRUE(hasCryptoFrame);
} }
TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasInitialData) { TEST_P(QuicPacketSchedulerTest, CloneSchedulerHasInitialData) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -1030,7 +1044,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasInitialData) {
EXPECT_TRUE(cloningScheduler.hasData()); EXPECT_TRUE(cloningScheduler.hasData());
} }
TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasAppDataData) { TEST_P(QuicPacketSchedulerTest, CloneSchedulerHasAppDataData) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -1041,7 +1055,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasAppDataData) {
EXPECT_TRUE(cloningScheduler.hasData()); EXPECT_TRUE(cloningScheduler.hasData());
} }
TEST_F(QuicPacketSchedulerTest, DoNotCloneHandshake) { TEST_P(QuicPacketSchedulerTest, DoNotCloneHandshake) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
FrameScheduler noopScheduler("frame", conn); FrameScheduler noopScheduler("frame", conn);
@ -1071,7 +1085,7 @@ TEST_F(QuicPacketSchedulerTest, DoNotCloneHandshake) {
EXPECT_EQ(expected, result->clonedPacketIdentifier->packetNumber); EXPECT_EQ(expected, result->clonedPacketIdentifier->packetNumber);
} }
TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) { TEST_P(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.version = QuicVersion::MVFST_EXPERIMENTAL2; conn.version = QuicVersion::MVFST_EXPERIMENTAL2;
@ -1127,7 +1141,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) {
*folly::IOBuf::copyBuffer("I'm out of the game"), result->packet->body)); *folly::IOBuf::copyBuffer("I'm out of the game"), result->packet->body));
} }
TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) { TEST_P(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
auto result = conn.streamManager->setMaxLocalBidirectionalStreams(10); auto result = conn.streamManager->setMaxLocalBidirectionalStreams(10);
@ -1211,7 +1225,7 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) {
EXPECT_EQ(1, streamWindowUpdateCounter); EXPECT_EQ(1, streamWindowUpdateCounter);
} }
TEST_F(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilder) { TEST_P(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilder) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.transportSettings.dataPathType = DataPathType::ContinuousMemory; conn.transportSettings.dataPathType = DataPathType::ContinuousMemory;
@ -1254,7 +1268,7 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilder) {
EXPECT_GT(buf->length(), 10); EXPECT_GT(buf->length(), 10);
} }
TEST_F(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilderFullPacket) { TEST_P(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilderFullPacket) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
auto streamResult = conn.streamManager->setMaxLocalBidirectionalStreams(10); auto streamResult = conn.streamManager->setMaxLocalBidirectionalStreams(10);
@ -1339,7 +1353,7 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerWithInplaceBuilderFullPacket) {
EXPECT_EQ(buf->length(), conn.udpSendPacketLen); EXPECT_EQ(buf->length(), conn.udpSendPacketLen);
} }
TEST_F(QuicPacketSchedulerTest, CloneLargerThanOriginalPacket) { TEST_P(QuicPacketSchedulerTest, CloneLargerThanOriginalPacket) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.udpSendPacketLen = 1000; conn.udpSendPacketLen = 1000;
@ -1406,7 +1420,7 @@ TEST_F(QuicPacketSchedulerTest, CloneLargerThanOriginalPacket) {
class AckSchedulingTest : public TestWithParam<PacketNumberSpace> {}; class AckSchedulingTest : public TestWithParam<PacketNumberSpace> {};
TEST_F(QuicPacketSchedulerTest, AckStateHasAcksToSchedule) { TEST_P(QuicPacketSchedulerTest, AckStateHasAcksToSchedule) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
EXPECT_FALSE(hasAcksToSchedule(*conn.ackStates.initialAckState)); EXPECT_FALSE(hasAcksToSchedule(*conn.ackStates.initialAckState));
@ -1424,7 +1438,7 @@ TEST_F(QuicPacketSchedulerTest, AckStateHasAcksToSchedule) {
EXPECT_TRUE(hasAcksToSchedule(*conn.ackStates.handshakeAckState)); EXPECT_TRUE(hasAcksToSchedule(*conn.ackStates.handshakeAckState));
} }
TEST_F(QuicPacketSchedulerTest, AckSchedulerHasAcksToSchedule) { TEST_P(QuicPacketSchedulerTest, AckSchedulerHasAcksToSchedule) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
AckScheduler initialAckScheduler( AckScheduler initialAckScheduler(
@ -1448,7 +1462,7 @@ TEST_F(QuicPacketSchedulerTest, AckSchedulerHasAcksToSchedule) {
EXPECT_TRUE(handshakeAckScheduler.hasPendingAcks()); EXPECT_TRUE(handshakeAckScheduler.hasPendingAcks());
} }
TEST_F(QuicPacketSchedulerTest, LargestAckToSend) { TEST_P(QuicPacketSchedulerTest, LargestAckToSend) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
EXPECT_EQ(none, largestAckToSend(*conn.ackStates.initialAckState)); EXPECT_EQ(none, largestAckToSend(*conn.ackStates.initialAckState));
@ -1464,7 +1478,7 @@ TEST_F(QuicPacketSchedulerTest, LargestAckToSend) {
EXPECT_EQ(none, largestAckToSend(conn.ackStates.appDataAckState)); EXPECT_EQ(none, largestAckToSend(conn.ackStates.appDataAckState));
} }
TEST_F(QuicPacketSchedulerTest, NeedsToSendAckWithoutAcksAvailable) { TEST_P(QuicPacketSchedulerTest, NeedsToSendAckWithoutAcksAvailable) {
// This covers the scheduler behavior when an IMMEDIATE_ACK frame is received. // This covers the scheduler behavior when an IMMEDIATE_ACK frame is received.
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
@ -1493,8 +1507,8 @@ TEST_F(QuicPacketSchedulerTest, NeedsToSendAckWithoutAcksAvailable) {
EXPECT_TRUE(handshakeAckScheduler.hasPendingAcks()); EXPECT_TRUE(handshakeAckScheduler.hasPendingAcks());
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1509,11 +1523,15 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) {
auto builder = setupMockPacketBuilder(); auto builder = setupMockPacketBuilder();
scheduler.writeStreams(*builder); scheduler.writeStreams(*builder);
verifyStreamFrames(*builder, {f1, f2, f3}); verifyStreamFrames(*builder, {f1, f2, f3});
EXPECT_EQ(nextScheduledStreamID(conn), 0); if (GetParam()) {
EXPECT_TRUE(conn.streamManager->writeQueue().empty());
} else {
EXPECT_EQ(nextScheduledStreamID(conn), 0);
}
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1526,6 +1544,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
auto f2 = writeDataToStream(conn, stream2, "some data"); auto f2 = writeDataToStream(conn, stream2, "some data");
auto f3 = writeDataToStream(conn, stream3, "some data"); auto f3 = writeDataToStream(conn, stream3, "some data");
// write a normal size packet from stream1
auto builder = createPacketBuilder(conn); auto builder = createPacketBuilder(conn);
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
@ -1535,10 +1554,10 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
verifyStreamFrames(*builder2, {f2, f3, f1}); verifyStreamFrames(*builder2, {f2, f3, f1});
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinNextsPer) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinNextsPer) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
conn.streamManager->writeQueue().setMaxNextsPerStream(2); conn.streamManager->setWriteQueueMaxNextsPerStream(2);
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
auto stream1 = createStream(conn); auto stream1 = createStream(conn);
@ -1567,8 +1586,8 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinNextsPer) {
verifyStreamFrames(*builder2, {stream1, stream1, stream2, stream3, stream1}); verifyStreamFrames(*builder2, {stream1, stream1, stream2, stream3, stream1});
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
conn.transportSettings.streamFramePerPacket = true; conn.transportSettings.streamFramePerPacket = true;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1582,6 +1601,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) {
auto f2 = writeDataToStream(conn, stream2, "some data"); auto f2 = writeDataToStream(conn, stream2, "some data");
auto f3 = writeDataToStream(conn, stream3, "some data"); auto f3 = writeDataToStream(conn, stream3, "some data");
// Write a normal size packet from stream1
auto builder = createPacketBuilder(conn); auto builder = createPacketBuilder(conn);
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
EXPECT_EQ(nextScheduledStreamID(conn), stream2); EXPECT_EQ(nextScheduledStreamID(conn), stream2);
@ -1596,10 +1616,10 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) {
verifyStreamFrames(*builder2, {f1}); verifyStreamFrames(*builder2, {f1});
} }
TEST_F( TEST_P(
QuicPacketSchedulerTest, QuicPacketSchedulerTest,
StreamFrameSchedulerRoundRobinStreamPerPacketHitsDsr) { StreamFrameSchedulerRoundRobinStreamPerPacketHitsDsr) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
conn.transportSettings.streamFramePerPacket = true; conn.transportSettings.streamFramePerPacket = true;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1636,7 +1656,7 @@ TEST_F(
dsrStream->pendingWrites)); // Move and destruct the pending writes dsrStream->pendingWrites)); // Move and destruct the pending writes
conn.streamManager->updateWritableStreams(*dsrStream); conn.streamManager->updateWritableStreams(*dsrStream);
// The default is to wraparound initially. // Write a normal size packet from stream1
auto builder1 = createPacketBuilder(conn); auto builder1 = createPacketBuilder(conn);
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
@ -1656,8 +1676,8 @@ TEST_F(
verifyStreamFrames(*builder2, {f2, f3}); verifyStreamFrames(*builder2, {f2, f3});
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1670,7 +1690,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
auto f2 = writeDataToStream(conn, stream2, "some data"); auto f2 = writeDataToStream(conn, stream2, "some data");
auto f3 = writeDataToStream(conn, stream3, "some data"); auto f3 = writeDataToStream(conn, stream3, "some data");
// The default is to wraparound initially. // Write a normal size packet from stream1
auto builder1 = createPacketBuilder(conn); auto builder1 = createPacketBuilder(conn);
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
@ -1683,8 +1703,8 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
verifyStreamFrames(*builder2, {f1, f2, f3}); verifyStreamFrames(*builder2, {f1, f2, f3});
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
conn.transportSettings.defaultPriority = conn.transportSettings.defaultPriority =
HTTPPriorityQueue::Priority(0, false); HTTPPriorityQueue::Priority(0, false);
@ -1699,6 +1719,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) {
auto f2 = writeDataToStream(conn, stream2, "some data"); auto f2 = writeDataToStream(conn, stream2, "some data");
auto f3 = writeDataToStream(conn, stream3, "some data"); auto f3 = writeDataToStream(conn, stream3, "some data");
// Write a normal size packet from stream1
auto builder1 = createPacketBuilder(conn); auto builder1 = createPacketBuilder(conn);
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
@ -1711,8 +1732,8 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) {
verifyStreamFrames(*builder2, {f1, f2, f3}); verifyStreamFrames(*builder2, {f1, f2, f3});
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1732,24 +1753,30 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
auto f3 = writeDataToStream(conn, stream3, "some data"); auto f3 = writeDataToStream(conn, stream3, "some data");
auto f4 = writeDataToStream(conn, stream4, "some data"); auto f4 = writeDataToStream(conn, stream4, "some data");
// This writes a normal size packet with 2, 4, 1
auto builder1 = createPacketBuilder(conn); auto builder1 = createPacketBuilder(conn);
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
EXPECT_EQ(nextScheduledStreamID(conn), stream3); EXPECT_EQ(nextScheduledStreamID(conn), stream3);
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2);
// 2 and 4 did not get removed from writable, so they get repeated here
// Should write frames for stream2, stream4, followed by stream 3 then 1. // Should write frames for stream2, stream4, followed by stream 3 then 1.
auto builder2 = setupMockPacketBuilder(); auto builder2 = setupMockPacketBuilder();
scheduler.writeStreams(*builder2); scheduler.writeStreams(*builder2);
verifyStreamFrames(*builder2, {f2, f4, f3, f1}); verifyStreamFrames(*builder2, {f2, f4, f3, f1});
EXPECT_EQ(nextScheduledStreamID(conn), stream3);
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2);
if (GetParam()) {
EXPECT_TRUE(conn.streamManager->writeQueue().empty());
} else {
EXPECT_EQ(nextScheduledStreamID(conn), stream3);
}
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1759,11 +1786,15 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) {
auto builder1 = createPacketBuilder(conn); auto builder1 = createPacketBuilder(conn);
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
EXPECT_EQ(nextScheduledStreamID(conn), 0); if (GetParam()) {
EXPECT_TRUE(conn.streamManager->writeQueue().empty());
} else {
EXPECT_EQ(nextScheduledStreamID(conn), 0);
}
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) { TEST_P(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1778,15 +1809,17 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) {
verifyStreamFrames(*builder, {f1, f2}); verifyStreamFrames(*builder, {f1, f2});
// Manually remove a stream and set the next scheduled to that stream. // Manually remove a stream and set the next scheduled to that stream.
conn.streamManager->writeQueue().setNextScheduledStream(stream2); conn.streamManager->removeWritable(*conn.streamManager->findStream(stream1));
conn.streamManager->removeWritable(*conn.streamManager->findStream(stream2)); // the queue is empty, reload it
conn.streamManager->updateWritableStreams(
*conn.streamManager->findStream(stream2));
scheduler.writeStreams(*builder); scheduler.writeStreams(*builder);
ASSERT_EQ(builder->frames_.size(), 1); ASSERT_EQ(builder->frames_.size(), 1);
verifyStreamFrames(*builder, {f1}); verifyStreamFrames(*builder, {f2});
} }
TEST_F( TEST_P(
QuicPacketSchedulerTest, QuicPacketSchedulerTest,
CloningSchedulerWithInplaceBuilderDoNotEncodeHeaderWithoutBuild) { CloningSchedulerWithInplaceBuilderDoNotEncodeHeaderWithoutBuild) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
@ -1831,7 +1864,7 @@ TEST_F(
EXPECT_EQ(buf->length(), 0); EXPECT_EQ(buf->length(), 0);
} }
TEST_F( TEST_P(
QuicPacketSchedulerTest, QuicPacketSchedulerTest,
CloningSchedulerWithInplaceBuilderRollbackBufWhenFailToRebuild) { CloningSchedulerWithInplaceBuilderRollbackBufWhenFailToRebuild) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
@ -1872,8 +1905,8 @@ TEST_F(
EXPECT_EQ(buf->length(), 0); EXPECT_EQ(buf->length(), 0);
} }
TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) { TEST_P(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) {
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -1895,7 +1928,7 @@ TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) {
EXPECT_EQ(highPriStreamId, writeStreamFrame.streamId); EXPECT_EQ(highPriStreamId, writeStreamFrame.streamId);
} }
TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { TEST_P(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -1966,7 +1999,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) {
EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength());
} }
TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) { TEST_P(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -2016,7 +2049,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) {
EXPECT_FALSE(scheduler.hasPendingData()); EXPECT_FALSE(scheduler.hasPendingData());
} }
TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { TEST_P(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -2089,7 +2122,91 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength());
} }
TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) { TEST_P(QuicPacketSchedulerTest, MultipleStreamsRunOutOfFlowControl) {
auto connPtr = createConn(10, 1000, 2000, GetParam());
auto& conn = *connPtr;
conn.udpSendPacketLen = 2000;
auto highPriStreamId =
(*conn.streamManager->createNextBidirectionalStream())->id;
auto lowPriStreamId =
(*conn.streamManager->createNextBidirectionalStream())->id;
auto highPriStream = conn.streamManager->findStream(highPriStreamId);
auto lowPriStream = conn.streamManager->findStream(lowPriStreamId);
// Write new data to high priority stream in excess of max data
auto newData = buildRandomInputData(2000);
ASSERT_TRUE(writeDataToQuicStream(*highPriStream, std::move(newData), true));
conn.streamManager->updateWritableStreams(
*highPriStream, /*connFlowControlOpen=*/true);
// Fake a loss data for low priority stream
lowPriStream->currentWriteOffset = 201;
auto lossData = buildRandomInputData(200);
lowPriStream->lossBuffer.emplace_back(
ChainedByteRangeHead(lossData), 0, true);
conn.streamManager->updateWritableStreams(
*lowPriStream, /*connFlowControlOpen=*/true);
StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader1(
ProtectionType::KeyPhaseZero,
getTestConnectionId(),
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder1(
conn.udpSendPacketLen,
std::move(shortHeader1),
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
ASSERT_TRUE(builder1.encodePacketHeader());
scheduler.writeStreams(builder1);
auto packet1 = std::move(builder1).buildPacket().packet;
ASSERT_TRUE(updateConnection(
conn, none, packet1, Clock::now(), 1200, 0, false /* isDSR */));
ASSERT_EQ(2, packet1.frames.size());
auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame();
EXPECT_EQ(highPriStreamId, writeStreamFrame1.streamId);
EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn));
EXPECT_EQ(1000, highPriStream->pendingWrites.chainLength());
EXPECT_EQ(1, highPriStream->retransmissionBuffer.size());
EXPECT_EQ(1000, highPriStream->retransmissionBuffer[0]->data.chainLength());
auto& writeStreamFrame2 = *packet1.frames[1].asWriteStreamFrame();
EXPECT_EQ(lowPriStreamId, writeStreamFrame2.streamId);
EXPECT_EQ(200, writeStreamFrame2.len);
EXPECT_TRUE(lowPriStream->lossBuffer.empty());
EXPECT_EQ(1, lowPriStream->retransmissionBuffer.size());
EXPECT_EQ(200, lowPriStream->retransmissionBuffer[0]->data.chainLength());
// Simulate additional flow control granted
conn.flowControlState.peerAdvertisedMaxOffset = 2000;
conn.streamManager->onMaxData();
// Don't need to call updateWritableStreams, onMaxData updates the state for
// any stream blocked on conn flow control
// Write remaining data for high priority stream
ShortHeader shortHeader2(
ProtectionType::KeyPhaseZero,
getTestConnectionId(),
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder2(
conn.udpSendPacketLen,
std::move(shortHeader2),
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
ASSERT_TRUE(builder2.encodePacketHeader());
scheduler.writeStreams(builder2);
auto packet2 = std::move(builder2).buildPacket().packet;
ASSERT_TRUE(updateConnection(
conn, none, packet2, Clock::now(), 1000, 0, false /* isDSR */));
ASSERT_EQ(1, packet2.frames.size());
auto& writeStreamFrame3 = *packet2.frames[0].asWriteStreamFrame();
EXPECT_EQ(highPriStreamId, writeStreamFrame3.streamId);
EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn));
EXPECT_EQ(
1000, highPriStream->retransmissionBuffer[1000]->data.chainLength());
EXPECT_EQ(1000, writeStreamFrame3.len);
}
TEST_P(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -2145,7 +2262,7 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) {
EXPECT_EQ(200, stream2->retransmissionBuffer[0]->data.chainLength()); EXPECT_EQ(200, stream2->retransmissionBuffer[0]->data.chainLength());
} }
TEST_F(QuicPacketSchedulerTest, WritingFINFromBufWithBufMetaFirst) { TEST_P(QuicPacketSchedulerTest, WritingFINFromBufWithBufMetaFirst) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -2189,7 +2306,7 @@ TEST_F(QuicPacketSchedulerTest, WritingFINFromBufWithBufMetaFirst) {
EXPECT_EQ(stream->currentWriteOffset, 6); EXPECT_EQ(stream->currentWriteOffset, 6);
} }
TEST_F(QuicPacketSchedulerTest, NoFINWriteWhenBufMetaWrittenFIN) { TEST_P(QuicPacketSchedulerTest, NoFINWriteWhenBufMetaWrittenFIN) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -2235,7 +2352,7 @@ TEST_F(QuicPacketSchedulerTest, NoFINWriteWhenBufMetaWrittenFIN) {
EXPECT_FALSE(scheduler2.hasPendingData()); EXPECT_FALSE(scheduler2.hasPendingData());
} }
TEST_F(QuicPacketSchedulerTest, DatagramFrameSchedulerMultipleFramesPerPacket) { TEST_P(QuicPacketSchedulerTest, DatagramFrameSchedulerMultipleFramesPerPacket) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.datagramState.maxReadFrameSize = std::numeric_limits<uint16_t>::max(); conn.datagramState.maxReadFrameSize = std::numeric_limits<uint16_t>::max();
@ -2263,7 +2380,7 @@ TEST_F(QuicPacketSchedulerTest, DatagramFrameSchedulerMultipleFramesPerPacket) {
ASSERT_EQ(frames.size(), 2); ASSERT_EQ(frames.size(), 2);
} }
TEST_F(QuicPacketSchedulerTest, DatagramFrameSchedulerOneFramePerPacket) { TEST_P(QuicPacketSchedulerTest, DatagramFrameSchedulerOneFramePerPacket) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.datagramState.maxReadFrameSize = std::numeric_limits<uint16_t>::max(); conn.datagramState.maxReadFrameSize = std::numeric_limits<uint16_t>::max();
@ -2296,7 +2413,7 @@ TEST_F(QuicPacketSchedulerTest, DatagramFrameSchedulerOneFramePerPacket) {
ASSERT_EQ(frames.size(), 2); ASSERT_EQ(frames.size(), 2);
} }
TEST_F(QuicPacketSchedulerTest, DatagramFrameWriteWhenRoomAvailable) { TEST_P(QuicPacketSchedulerTest, DatagramFrameWriteWhenRoomAvailable) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.datagramState.maxReadFrameSize = std::numeric_limits<uint16_t>::max(); conn.datagramState.maxReadFrameSize = std::numeric_limits<uint16_t>::max();
@ -2327,7 +2444,7 @@ TEST_F(QuicPacketSchedulerTest, DatagramFrameWriteWhenRoomAvailable) {
ASSERT_EQ(frames.size(), 1); ASSERT_EQ(frames.size(), 1);
} }
TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingWithSpaceForPadding) { TEST_P(QuicPacketSchedulerTest, ShortHeaderPaddingWithSpaceForPadding) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
size_t paddingModulo = 16; size_t paddingModulo = 16;
@ -2401,7 +2518,7 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingWithSpaceForPadding) {
EXPECT_EQ(packetLength1, packetLength2); EXPECT_EQ(packetLength1, packetLength2);
} }
TEST_F(QuicPacketSchedulerTest, ShortHeaderFixedPaddingAtStart) { TEST_P(QuicPacketSchedulerTest, ShortHeaderFixedPaddingAtStart) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
conn.transportSettings.fixedShortHeaderPadding = 2; conn.transportSettings.fixedShortHeaderPadding = 2;
@ -2451,7 +2568,7 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderFixedPaddingAtStart) {
EXPECT_TRUE(frames[2].asPaddingFrame()); EXPECT_TRUE(frames[2].asPaddingFrame());
} }
TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingNearMaxPacketLength) { TEST_P(QuicPacketSchedulerTest, ShortHeaderPaddingNearMaxPacketLength) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
conn.udpSendPacketLen = 1000; conn.udpSendPacketLen = 1000;
@ -2503,7 +2620,7 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingNearMaxPacketLength) {
EXPECT_EQ(packetLength, conn.udpSendPacketLen); EXPECT_EQ(packetLength, conn.udpSendPacketLen);
} }
TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingMaxPacketLength) { TEST_P(QuicPacketSchedulerTest, ShortHeaderPaddingMaxPacketLength) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
conn.udpSendPacketLen = 1000; conn.udpSendPacketLen = 1000;
@ -2554,7 +2671,7 @@ TEST_F(QuicPacketSchedulerTest, ShortHeaderPaddingMaxPacketLength) {
EXPECT_EQ(packetLength, conn.udpSendPacketLen); EXPECT_EQ(packetLength, conn.udpSendPacketLen);
} }
TEST_F(QuicPacketSchedulerTest, ImmediateAckFrameSchedulerOnRequest) { TEST_P(QuicPacketSchedulerTest, ImmediateAckFrameSchedulerOnRequest) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.pendingEvents.requestImmediateAck = true; conn.pendingEvents.requestImmediateAck = true;
@ -2592,7 +2709,7 @@ TEST_F(QuicPacketSchedulerTest, ImmediateAckFrameSchedulerOnRequest) {
EXPECT_EQ(conn.udpSendPacketLen, packetLength); EXPECT_EQ(conn.udpSendPacketLen, packetLength);
} }
TEST_F(QuicPacketSchedulerTest, ImmediateAckFrameSchedulerNotRequested) { TEST_P(QuicPacketSchedulerTest, ImmediateAckFrameSchedulerNotRequested) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
conn.pendingEvents.requestImmediateAck = false; conn.pendingEvents.requestImmediateAck = false;
@ -2632,7 +2749,7 @@ TEST_F(QuicPacketSchedulerTest, ImmediateAckFrameSchedulerNotRequested) {
EXPECT_LT(packetLength, conn.udpSendPacketLen); EXPECT_LT(packetLength, conn.udpSendPacketLen);
} }
TEST_F(QuicPacketSchedulerTest, RstStreamSchedulerReliableReset) { TEST_P(QuicPacketSchedulerTest, RstStreamSchedulerReliableReset) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
ASSERT_FALSE( ASSERT_FALSE(
@ -2718,12 +2835,12 @@ TEST_F(QuicPacketSchedulerTest, RstStreamSchedulerReliableReset) {
EXPECT_FALSE(conn.pendingEvents.resets.contains(stream->id)); EXPECT_FALSE(conn.pendingEvents.resets.contains(stream->id));
} }
TEST_F(QuicPacketSchedulerTest, PausedPriorityEnabled) { TEST_P(QuicPacketSchedulerTest, PausedPriorityEnabled) {
static const auto kSequentialPriority = HTTPPriorityQueue::Priority(3, false); static const auto kSequentialPriority = HTTPPriorityQueue::Priority(3, false);
static const HTTPPriorityQueue::Priority kPausedPriority = static const HTTPPriorityQueue::Priority kPausedPriority =
HTTPPriorityQueue::Priority::PAUSED; HTTPPriorityQueue::Priority::PAUSED;
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
@ -2753,14 +2870,14 @@ TEST_F(QuicPacketSchedulerTest, PausedPriorityEnabled) {
ASSERT_FALSE(conn.streamManager->hasWritable()); ASSERT_FALSE(conn.streamManager->hasWritable());
} }
TEST_F(QuicPacketSchedulerTest, PausedPriorityDisabled) { TEST_P(QuicPacketSchedulerTest, PausedPriorityDisabled) {
static const auto kSequentialPriority = HTTPPriorityQueue::Priority(3, false); static const auto kSequentialPriority = HTTPPriorityQueue::Priority(3, false);
static const HTTPPriorityQueue::Priority kPausedPriority = static const HTTPPriorityQueue::Priority kPausedPriority =
HTTPPriorityQueue::Priority::PAUSED; HTTPPriorityQueue::Priority::PAUSED;
auto connPtr = createConn(10, 100000, 100000); auto connPtr = createConn(10, 100000, 100000, GetParam());
auto& conn = *connPtr; auto& conn = *connPtr;
conn.transportSettings.disablePausedPriority = true; transportSettings.disablePausedPriority = true;
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
auto pausedStreamId = createStream(conn, kPausedPriority); auto pausedStreamId = createStream(conn, kPausedPriority);
@ -2774,7 +2891,7 @@ TEST_F(QuicPacketSchedulerTest, PausedPriorityDisabled) {
verifyStreamFrames(*builder, {regularFrame, pausedFrame}); verifyStreamFrames(*builder, {regularFrame, pausedFrame});
} }
TEST_F(QuicPacketSchedulerTest, FixedShortHeaderPadding) { TEST_P(QuicPacketSchedulerTest, FixedShortHeaderPadding) {
QuicServerConnectionState conn( QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build()); FizzServerQuicHandshakeContext::Builder().build());
conn.transportSettings.fixedShortHeaderPadding = 2; conn.transportSettings.fixedShortHeaderPadding = 2;
@ -2828,7 +2945,7 @@ TEST_F(QuicPacketSchedulerTest, FixedShortHeaderPadding) {
// This test class sets up a connection with all the fields that can be included // This test class sets up a connection with all the fields that can be included
// in an ACK. The fixtures for this class confirm that the scheduler writes the // in an ACK. The fixtures for this class confirm that the scheduler writes the
// correct frame type and fields enabled by the connection state. // correct frame type and fields enabled by the connection state.
class QuicAckSchedulerTest : public Test { class QuicAckSchedulerTest : public QuicPacketSchedulerTestBase, public Test {
protected: protected:
QuicAckSchedulerTest() QuicAckSchedulerTest()
: conn_(createConn(10, 100000, 100000)), : conn_(createConn(10, 100000, 100000)),
@ -3247,4 +3364,9 @@ TEST_F(QuicAckSchedulerTest, AckExtendedTakesPrecedenceOverReceiveTimestamps) {
EXPECT_EQ(ackFrame->ecnCECount, 3); EXPECT_EQ(ackFrame->ecnCECount, 3);
} }
INSTANTIATE_TEST_SUITE_P(
QuicPacketSchedulerTest,
QuicPacketSchedulerTest,
::testing::Values(false, true));
} // namespace quic::test } // namespace quic::test

View File

@ -124,6 +124,14 @@ class QuicTransportTest : public Test {
return MockByteEventCallback::getTxMatcher(id, offset); return MockByteEventCallback::getTxMatcher(id, offset);
} }
StreamId nextScheduledStreamID(QuicConnectionStateBase& conn) {
auto oldWriteQueue = conn.streamManager->oldWriteQueue();
if (oldWriteQueue) {
return oldWriteQueue->getNextScheduledStream();
}
return conn.streamManager->writeQueue().peekNextScheduledID().asStreamID();
}
protected: protected:
folly::EventBase evb_; folly::EventBase evb_;
std::shared_ptr<FollyQuicEventBase> qEvb_; std::shared_ptr<FollyQuicEventBase> qEvb_;
@ -4606,8 +4614,8 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) {
EXPECT_EQ(streamFrame->streamId, s1); EXPECT_EQ(streamFrame->streamId, s1);
conn.outstandings.reset(); conn.outstandings.reset();
// Start from stream2 instead of stream1 // Queue already points to stream2
conn.streamManager->writeQueue().setNextScheduledStream(s2); EXPECT_EQ(nextScheduledStreamID(conn), stream2->id);
writableBytes = kDefaultUDPSendPacketLen - 100; writableBytes = kDefaultUDPSendPacketLen - 100;
EXPECT_CALL(*socket_, write(_, _, _)) EXPECT_CALL(*socket_, write(_, _, _))
@ -4633,8 +4641,13 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) {
EXPECT_EQ(streamFrame2->streamId, s2); EXPECT_EQ(streamFrame2->streamId, s2);
conn.outstandings.reset(); conn.outstandings.reset();
// Test wrap around // Test wrap around by skipping a stream
conn.streamManager->writeQueue().setNextScheduledStream(s2); auto oldWriteQueue = conn.streamManager->oldWriteQueue();
if (oldWriteQueue) {
oldWriteQueue->setNextScheduledStream(s2);
} else {
conn.streamManager->writeQueue().getNextScheduledID(quic::none);
}
writableBytes = kDefaultUDPSendPacketLen; writableBytes = kDefaultUDPSendPacketLen;
EXPECT_CALL(*socket_, write(_, _, _)) EXPECT_CALL(*socket_, write(_, _, _))
.WillOnce(testing::WithArgs<1, 2>(Invoke(getTotalIovecLen))); .WillOnce(testing::WithArgs<1, 2>(Invoke(getTotalIovecLen)));

View File

@ -765,7 +765,10 @@ void overridePacketWithToken(
} }
bool writableContains(QuicStreamManager& streamManager, StreamId streamId) { bool writableContains(QuicStreamManager& streamManager, StreamId streamId) {
return streamManager.writeQueue().count(streamId) > 0 || auto oldQueue = streamManager.oldWriteQueue();
return (oldQueue && oldQueue->count(streamId) > 0) ||
streamManager.writeQueue().contains(
PriorityQueue::Identifier::fromStreamID(streamId)) > 0 ||
streamManager.controlWriteQueue().count(streamId) > 0; streamManager.controlWriteQueue().count(streamId) > 0;
} }

View File

@ -48,6 +48,30 @@ DSRStreamFrameScheduler::enrichAndAddSendInstruction(
return result; return result;
} }
DSRStreamFrameScheduler::SchedulingResult
DSRStreamFrameScheduler::enrichAndAddSendInstruction(
uint32_t encodedSize,
DSRStreamFrameScheduler::SchedulingResult result,
DSRPacketBuilderBase& packetBuilder,
SendInstruction::Builder& instructionBuilder,
const PriorityQueue& writeQueue,
QuicStreamState& stream) {
enrichInstruction(instructionBuilder, stream);
packetBuilder.addSendInstruction(
instructionBuilder.build(), encodedSize, stream.streamPacketIdx++);
result.writeSuccess = true;
result.sender = stream.dsrSender.get();
auto id = writeQueue.peekNextScheduledID();
CHECK(id.isStreamID());
auto nextStreamId = id.asStreamID();
auto nextStream =
CHECK_NOTNULL(conn_.streamManager->findStream(nextStreamId));
if (nextStream->hasSchedulableData()) {
nextStreamNonDsr_ = true;
}
return result;
}
/** /**
* Note the difference between this and the regular StreamFrameScheduler. * Note the difference between this and the regular StreamFrameScheduler.
* There is no current way of knowing if two streams can be DSR-ed from the * There is no current way of knowing if two streams can be DSR-ed from the
@ -56,8 +80,126 @@ DSRStreamFrameScheduler::enrichAndAddSendInstruction(
*/ */
folly::Expected<DSRStreamFrameScheduler::SchedulingResult, QuicError> folly::Expected<DSRStreamFrameScheduler::SchedulingResult, QuicError>
DSRStreamFrameScheduler::writeStream(DSRPacketBuilderBase& builder) { DSRStreamFrameScheduler::writeStream(DSRPacketBuilderBase& builder) {
auto oldWriteQueue = conn_.streamManager->oldWriteQueue();
if (oldWriteQueue) {
return writeStreamImpl(builder, *oldWriteQueue);
} else {
return writeStreamImpl(builder, conn_.streamManager->writeQueue());
}
}
folly::Expected<DSRStreamFrameScheduler::SchedulingResult, QuicError>
DSRStreamFrameScheduler::writeStreamImpl(
DSRPacketBuilderBase& builder,
PriorityQueue& writeQueue) {
SchedulingResult result;
if (writeQueue.empty()) {
return result;
}
auto txn = writeQueue.beginTransaction();
auto guard =
folly::makeGuard([&] { writeQueue.rollbackTransaction(std::move(txn)); });
auto id = writeQueue.getNextScheduledID(quic::none);
CHECK(id.isStreamID());
auto streamId = id.asStreamID();
auto stream = conn_.streamManager->findStream(streamId);
CHECK(stream);
if (!stream->dsrSender || !stream->hasSchedulableDsr()) {
nextStreamNonDsr_ = true;
return result;
}
bool hasFreshBufMeta = stream->writeBufMeta.length > 0;
bool hasLossBufMeta = !stream->lossBufMetas.empty();
CHECK(stream->hasSchedulableDsr());
if (hasLossBufMeta) {
SendInstruction::Builder instructionBuilder(conn_, streamId);
auto encodedSizeExpected = writeDSRStreamFrame(
builder,
instructionBuilder,
streamId,
stream->lossBufMetas.front().offset,
stream->lossBufMetas.front().length,
stream->lossBufMetas.front()
.length, // flowControlLen shouldn't be used to limit loss write
stream->lossBufMetas.front().eof,
stream->currentWriteOffset + stream->pendingWrites.chainLength());
if (encodedSizeExpected.hasError()) {
return folly::makeUnexpected(encodedSizeExpected.error());
}
auto encodedSize = encodedSizeExpected.value();
if (encodedSize > 0) {
if (builder.remainingSpace() < encodedSize) {
return result;
}
guard.dismiss();
writeQueue.commitTransaction(std::move(txn));
return enrichAndAddSendInstruction(
encodedSize,
std::move(result),
builder,
instructionBuilder,
writeQueue,
*stream);
}
}
if (!hasFreshBufMeta || builder.remainingSpace() == 0) {
return result;
}
// If we have fresh BufMeta to write, the offset cannot be 0. This is based on
// the current limit that some real data has to be written into the stream
// before BufMetas.
CHECK_NE(stream->writeBufMeta.offset, 0);
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
if (connWritableBytes == 0) {
return result;
}
// When stream still has pendingWrites, getSendStreamFlowControlBytesWire
// counts from currentWriteOffset which isn't right for BufMetas.
auto streamFlowControlLen = std::min(
getSendStreamFlowControlBytesWire(*stream),
stream->flowControlState.peerAdvertisedMaxOffset -
stream->writeBufMeta.offset);
auto flowControlLen = std::min(streamFlowControlLen, connWritableBytes);
bool canWriteFin = stream->finalWriteOffset.has_value() &&
stream->writeBufMeta.length <= flowControlLen;
SendInstruction::Builder instructionBuilder(conn_, streamId);
auto encodedSizeExpected = writeDSRStreamFrame(
builder,
instructionBuilder,
streamId,
stream->writeBufMeta.offset,
stream->writeBufMeta.length,
flowControlLen,
canWriteFin,
stream->currentWriteOffset + stream->pendingWrites.chainLength());
if (encodedSizeExpected.hasError()) {
return folly::makeUnexpected(encodedSizeExpected.error());
}
auto encodedSize = encodedSizeExpected.value();
if (encodedSize > 0) {
if (builder.remainingSpace() < encodedSize) {
return result;
}
guard.dismiss();
writeQueue.commitTransaction(std::move(txn));
return enrichAndAddSendInstruction(
encodedSize,
std::move(result),
builder,
instructionBuilder,
writeQueue,
*stream);
}
return result;
}
folly::Expected<DSRStreamFrameScheduler::SchedulingResult, QuicError>
DSRStreamFrameScheduler::writeStreamImpl(
DSRPacketBuilderBase& builder,
const deprecated::PriorityQueue& writeQueue) {
SchedulingResult result; SchedulingResult result;
auto& writeQueue = conn_.streamManager->writeQueue();
const auto& levelIter = std::find_if( const auto& levelIter = std::find_if(
writeQueue.levels.cbegin(), writeQueue.levels.cbegin(),
writeQueue.levels.cend(), writeQueue.levels.cend(),

View File

@ -47,6 +47,21 @@ class DSRStreamFrameScheduler {
const deprecated::PriorityQueue::LevelItr&, const deprecated::PriorityQueue::LevelItr&,
QuicStreamState&); QuicStreamState&);
SchedulingResult enrichAndAddSendInstruction(
uint32_t,
SchedulingResult,
DSRPacketBuilderBase&,
SendInstruction::Builder&,
const PriorityQueue&,
QuicStreamState&);
folly::Expected<DSRStreamFrameScheduler::SchedulingResult, QuicError>
writeStreamImpl(
DSRPacketBuilderBase& builder,
const deprecated::PriorityQueue&);
folly::Expected<DSRStreamFrameScheduler::SchedulingResult, QuicError>
writeStreamImpl(DSRPacketBuilderBase& builder, PriorityQueue&);
private: private:
QuicServerConnectionState& conn_; QuicServerConnectionState& conn_;
bool nextStreamNonDsr_{false}; bool nextStreamNonDsr_{false};

View File

@ -412,7 +412,8 @@ void handleStreamWindowUpdate(
stream.writeBufMeta.length) { stream.writeBufMeta.length) {
updateFlowControlList(stream); updateFlowControlList(stream);
} }
stream.conn.streamManager->updateWritableStreams(stream); stream.conn.streamManager->updateWritableStreams(
stream, getSendConnFlowControlBytesWire(stream.conn) > 0);
if (stream.conn.qLogger) { if (stream.conn.qLogger) {
stream.conn.qLogger->addTransportStateUpdate( stream.conn.qLogger->addTransportStateUpdate(
getRxStreamWU(stream.id, packetNum, maximumData)); getRxStreamWU(stream.id, packetNum, maximumData));
@ -428,6 +429,7 @@ void handleConnWindowUpdate(
PacketNum packetNum) { PacketNum packetNum) {
if (conn.flowControlState.peerAdvertisedMaxOffset <= frame.maximumData) { if (conn.flowControlState.peerAdvertisedMaxOffset <= frame.maximumData) {
conn.flowControlState.peerAdvertisedMaxOffset = frame.maximumData; conn.flowControlState.peerAdvertisedMaxOffset = frame.maximumData;
conn.streamManager->onMaxData();
if (conn.qLogger) { if (conn.qLogger) {
conn.qLogger->addTransportStateUpdate( conn.qLogger->addTransportStateUpdate(
getRxConnWU(packetNum, frame.maximumData)); getRxConnWU(packetNum, frame.maximumData));

View File

@ -177,6 +177,15 @@ class QuicLossFunctionsTest : public TestWithParam<PacketNumberSpace> {
return MockLegacyObserver::getLossPacketMatcher( return MockLegacyObserver::getLossPacketMatcher(
packetNum, lossByReorder, lossByTimeout); packetNum, lossByReorder, lossByTimeout);
} }
StreamId writeQueueContains(QuicConnectionStateBase& conn, StreamId id) {
auto oldWriteQueue = conn.streamManager->oldWriteQueue();
if (oldWriteQueue) {
return oldWriteQueue->count(id);
}
return conn.streamManager->writeQueue().contains(
PriorityQueue::Identifier::fromStreamID(id));
}
}; };
LossVisitor testingLossMarkFunc(std::vector<PacketNum>& lostPackets) { LossVisitor testingLossMarkFunc(std::vector<PacketNum>& lostPackets) {
@ -2626,7 +2635,7 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
ASSERT_EQ(stream->streamLossCount, 1); ASSERT_EQ(stream->streamLossCount, 1);
EXPECT_FALSE(stream->hasWritableBufMeta()); EXPECT_FALSE(stream->hasWritableBufMeta());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id));
EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); EXPECT_TRUE(writeQueueContains(*conn, stream->id));
// Lose the 3rd dsr packet: // Lose the 3rd dsr packet:
RegularQuicWritePacket packet3(PacketHeader(ShortHeader( RegularQuicWritePacket packet3(PacketHeader(ShortHeader(
@ -2646,7 +2655,7 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
EXPECT_TRUE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->hasLoss());
EXPECT_FALSE(stream->hasWritableBufMeta()); EXPECT_FALSE(stream->hasWritableBufMeta());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id));
EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); EXPECT_TRUE(writeQueueContains(*conn, stream->id));
// Lose the 3rd dsr packet, it should be merged together with the first // Lose the 3rd dsr packet, it should be merged together with the first
// element in the lossBufMetas: // element in the lossBufMetas:
@ -2667,7 +2676,7 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
EXPECT_TRUE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->hasLoss());
EXPECT_FALSE(stream->hasWritableBufMeta()); EXPECT_FALSE(stream->hasWritableBufMeta());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id)); EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id));
EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id)); EXPECT_TRUE(writeQueueContains(*conn, stream->id));
} }
TEST_F(QuicLossFunctionsTest, TestReorderingThresholdDSRNormal) { TEST_F(QuicLossFunctionsTest, TestReorderingThresholdDSRNormal) {

View File

@ -7,6 +7,7 @@
#include <quic/logging/QLogger.h> #include <quic/logging/QLogger.h>
#include <quic/priority/HTTPPriorityQueue.h> #include <quic/priority/HTTPPriorityQueue.h>
#include <quic/state/QuicPriorityQueue.h>
#include <quic/state/QuicStreamManager.h> #include <quic/state/QuicStreamManager.h>
#include <quic/state/QuicStreamUtilities.h> #include <quic/state/QuicStreamUtilities.h>
#include <quic/state/QuicTransportStatsCallback.h> #include <quic/state/QuicTransportStatsCallback.h>
@ -107,7 +108,11 @@ static LocalErrorCode openLocalStreamIfNotClosed(
void QuicStreamManager::setWriteQueueMaxNextsPerStream( void QuicStreamManager::setWriteQueueMaxNextsPerStream(
uint64_t maxNextsPerStream) { uint64_t maxNextsPerStream) {
writeQueue_.setMaxNextsPerStream(maxNextsPerStream); if (oldWriteQueue_) {
oldWriteQueue_->setMaxNextsPerStream(maxNextsPerStream);
}
dynamic_cast<HTTPPriorityQueue&>(writeQueue())
.advanceAfterNext(maxNextsPerStream);
} }
bool QuicStreamManager::streamExists(StreamId streamId) { bool QuicStreamManager::streamExists(StreamId streamId) {
@ -229,21 +234,35 @@ bool QuicStreamManager::consumeMaxLocalUnidirectionalStreamIdIncreased() {
return res; return res;
} }
folly::Expected<folly::Unit, LocalErrorCode>
QuicStreamManager::setPriorityQueue(std::unique_ptr<PriorityQueue> queue) {
if (oldWriteQueue_) {
LOG(ERROR) << "Cannot change priority queue when the old queue is in use";
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
if (!writeQueue().empty()) {
LOG(ERROR) << "Cannot change priority queue when the queue is not empty";
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
writeQueue_ = std::move(queue);
return folly::unit;
}
bool QuicStreamManager::setStreamPriority( bool QuicStreamManager::setStreamPriority(
StreamId id, StreamId id,
const PriorityQueue::Priority& newPriority, const PriorityQueue::Priority& newPriority,
bool connFlowControlOpen,
const std::shared_ptr<QLogger>& qLogger) { const std::shared_ptr<QLogger>& qLogger) {
auto stream = findStream(id); auto stream = findStream(id);
if (stream) { if (stream) {
static const HTTPPriorityQueue kPriorityQueue; if (writeQueue().equalPriority(stream->priority, newPriority)) {
if (kPriorityQueue.equalPriority(stream->priority, newPriority)) {
return false; return false;
} }
stream->priority = newPriority; stream->priority = newPriority;
updateWritableStreams(*stream); updateWritableStreams(*stream, connFlowControlOpen);
if (qLogger) { if (qLogger) {
qLogger->addPriorityUpdate( qLogger->addPriorityUpdate(
id, kPriorityQueue.toLogFields(stream->priority)); id, writeQueue().toLogFields(stream->priority));
} }
return true; return true;
} }
@ -265,6 +284,30 @@ QuicStreamManager::refreshTransportSettings(const TransportSettings& settings) {
// Propagate the error // Propagate the error
return folly::makeUnexpected(resultUni.error()); return folly::makeUnexpected(resultUni.error());
} }
// TODO: The dependency on HTTPPriorityQueue here seems out of place in
// the long term
if (!writeQueue_) {
writeQueue_ = std::make_unique<HTTPPriorityQueue>();
}
if (!transportSettings_->useNewPriorityQueue && !oldWriteQueue_) {
if (writeQueue_->empty() && connFlowControlBlocked_.empty()) {
oldWriteQueue_ = std::make_unique<deprecated::PriorityQueue>();
} else {
return folly::makeUnexpected(QuicError(
QuicErrorCode(LocalErrorCode::INTERNAL_ERROR),
"Cannot change priority queue when the queue is not empty"));
}
} else if (transportSettings_->useNewPriorityQueue && oldWriteQueue_) {
if (oldWriteQueue_->empty()) {
oldWriteQueue_.reset();
} else {
return folly::makeUnexpected(QuicError(
QuicErrorCode(LocalErrorCode::INTERNAL_ERROR),
"Cannot change to new priority queue when the queue is not empty"));
}
} // else no change
return folly::unit; return folly::unit;
} }
@ -742,7 +785,7 @@ folly::Expected<folly::Unit, QuicError> QuicStreamManager::removeClosedStream(
windowUpdates_.erase(streamId); windowUpdates_.erase(streamId);
stopSendingStreams_.erase(streamId); stopSendingStreams_.erase(streamId);
flowControlUpdated_.erase(streamId); flowControlUpdated_.erase(streamId);
connFlowControlBlocked_.erase(streamId);
// Adjust control stream count if needed // Adjust control stream count if needed
if (it->second.isControl) { if (it->second.isControl) {
DCHECK_GT(numControlStreams_, 0); DCHECK_GT(numControlStreams_, 0);
@ -838,7 +881,9 @@ void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
} }
} }
void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { void QuicStreamManager::updateWritableStreams(
QuicStreamState& stream,
bool connFlowControlOpen) {
// Check for terminal write errors first // Check for terminal write errors first
if (stream.streamWriteError.has_value() && !stream.reliableSizeToPeer) { if (stream.streamWriteError.has_value() && !stream.reliableSizeToPeer) {
CHECK(stream.lossBuffer.empty()); CHECK(stream.lossBuffer.empty());
@ -850,7 +895,8 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
// Check if paused // Check if paused
// pausedButDisabled adds a hard dep on writeQueue being an HTTPPriorityQueue. // pausedButDisabled adds a hard dep on writeQueue being an HTTPPriorityQueue.
auto httpPri = HTTPPriorityQueue::Priority(stream.priority); auto httpPri = HTTPPriorityQueue::Priority(stream.priority);
if (httpPri->paused && !transportSettings_->disablePausedPriority) { if (oldWriteQueue_ && httpPri->paused &&
!transportSettings_->disablePausedPriority) {
removeWritable(stream); removeWritable(stream);
return; return;
} }
@ -878,23 +924,39 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
} }
// Update the actual scheduling queues (PriorityQueue or control set) // Update the actual scheduling queues (PriorityQueue or control set)
if (stream.hasSchedulableData() || stream.hasSchedulableDsr()) { connFlowControlOpen |= bool(oldWriteQueue_);
if (stream.hasSchedulableData(connFlowControlOpen) ||
stream.hasSchedulableDsr(connFlowControlOpen)) {
if (stream.isControl) { if (stream.isControl) {
controlWriteQueue_.emplace(stream.id); controlWriteQueue_.emplace(stream.id);
} else { } else {
const static deprecated::Priority kPausedDisabledPriority(7, true); if (oldWriteQueue_) {
auto oldPri = httpPri->paused const static deprecated::Priority kPausedDisabledPriority(7, true);
? kPausedDisabledPriority auto oldPri = httpPri->paused
: deprecated::Priority( ? kPausedDisabledPriority
httpPri->urgency, httpPri->incremental, httpPri->order); : deprecated::Priority(
writeQueue_.insertOrUpdate(stream.id, oldPri); httpPri->urgency, httpPri->incremental, httpPri->order);
oldWriteQueue_->insertOrUpdate(stream.id, oldPri);
} else {
const static PriorityQueue::Priority kPausedDisabledPriority(
HTTPPriorityQueue::Priority(7, true));
writeQueue().insertOrUpdate(
PriorityQueue::Identifier::fromStreamID(stream.id),
httpPri->paused && transportSettings_->disablePausedPriority
? kPausedDisabledPriority
: stream.priority);
}
} }
} else { } else {
// Not schedulable, remove from queues // Not schedulable, remove from queues
if (stream.isControl) { if (stream.isControl) {
controlWriteQueue_.erase(stream.id); controlWriteQueue_.erase(stream.id);
} else { } else {
writeQueue_.erase(stream.id); if (oldWriteQueue_) {
oldWriteQueue_->erase(stream.id);
} else {
writeQueue().erase(PriorityQueue::Identifier::fromStreamID(stream.id));
}
} }
} }
} }

View File

@ -16,6 +16,7 @@
#include <quic/state/StreamData.h> #include <quic/state/StreamData.h>
#include <quic/state/TransportSettings.h> #include <quic/state/TransportSettings.h>
#include <numeric> #include <numeric>
#include <set>
namespace quic { namespace quic {
class QLogger; class QLogger;
@ -218,6 +219,7 @@ class QuicStreamManager {
unidirectionalReadableStreams_ = unidirectionalReadableStreams_ =
std::move(other.unidirectionalReadableStreams_); std::move(other.unidirectionalReadableStreams_);
peekableStreams_ = std::move(other.peekableStreams_); peekableStreams_ = std::move(other.peekableStreams_);
oldWriteQueue_ = std::move(other.oldWriteQueue_);
writeQueue_ = std::move(other.writeQueue_); writeQueue_ = std::move(other.writeQueue_);
controlWriteQueue_ = std::move(other.controlWriteQueue_); controlWriteQueue_ = std::move(other.controlWriteQueue_);
writableStreams_ = std::move(other.writableStreams_); writableStreams_ = std::move(other.writableStreams_);
@ -309,7 +311,9 @@ class QuicStreamManager {
/* /*
* Update the current writable streams for the given stream state. * Update the current writable streams for the given stream state.
*/ */
void updateWritableStreams(QuicStreamState& stream); void updateWritableStreams(
QuicStreamState& stream,
bool connFlowControlOpen = true);
/* /*
* Find a open and active (we have created state for it) stream and return its * Find a open and active (we have created state for it) stream and return its
@ -442,10 +446,7 @@ class QuicStreamManager {
} }
folly::Expected<folly::Unit, LocalErrorCode> setPriorityQueue( folly::Expected<folly::Unit, LocalErrorCode> setPriorityQueue(
std::unique_ptr<PriorityQueue>) { std::unique_ptr<PriorityQueue> queue);
LOG(ERROR) << "setPriorityQueue is not supported yet";
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
/** /**
* Update stream priority if the stream indicated by id exists. * Update stream priority if the stream indicated by id exists.
@ -453,6 +454,7 @@ class QuicStreamManager {
bool setStreamPriority( bool setStreamPriority(
StreamId id, StreamId id,
const PriorityQueue::Priority& priority, const PriorityQueue::Priority& priority,
bool connFlowControlOpen = true,
const std::shared_ptr<QLogger>& qLogger = nullptr); const std::shared_ptr<QLogger>& qLogger = nullptr);
auto& writableDSRStreams() { auto& writableDSRStreams() {
@ -464,11 +466,16 @@ class QuicStreamManager {
} }
auto& writeQueue() { auto& writeQueue() {
return writeQueue_; return *writeQueue_;
}
auto* oldWriteQueue() {
return oldWriteQueue_.get();
} }
bool hasWritable() const { bool hasWritable() const {
return !writeQueue_.empty() || !controlWriteQueue_.empty(); return (oldWriteQueue_ && !oldWriteQueue_->empty()) ||
!writeQueue_->empty() || !controlWriteQueue_.empty();
} }
[[nodiscard]] bool hasDSRWritable() const { [[nodiscard]] bool hasDSRWritable() const {
@ -483,7 +490,12 @@ class QuicStreamManager {
if (stream.isControl) { if (stream.isControl) {
controlWriteQueue_.erase(stream.id); controlWriteQueue_.erase(stream.id);
} else { } else {
writeQueue_.erase(stream.id); if (oldWriteQueue_) {
oldWriteQueue()->erase(stream.id);
} else {
writeQueue().erase(PriorityQueue::Identifier::fromStreamID(stream.id));
connFlowControlBlocked_.erase(stream.id);
}
} }
writableStreams_.erase(stream.id); writableStreams_.erase(stream.id);
writableDSRStreams_.erase(stream.id); writableDSRStreams_.erase(stream.id);
@ -494,7 +506,10 @@ class QuicStreamManager {
void clearWritable() { void clearWritable() {
writableStreams_.clear(); writableStreams_.clear();
writableDSRStreams_.clear(); writableDSRStreams_.clear();
writeQueue_.clear(); if (oldWriteQueue_) {
oldWriteQueue()->clear();
}
writeQueue().clear();
controlWriteQueue_.clear(); controlWriteQueue_.clear();
} }
@ -799,6 +814,25 @@ class QuicStreamManager {
void setWriteQueueMaxNextsPerStream(uint64_t maxNextsPerStream); void setWriteQueueMaxNextsPerStream(uint64_t maxNextsPerStream);
void addConnFCBlockedStream(StreamId id) {
if (!oldWriteQueue_) {
connFlowControlBlocked_.insert(id);
}
}
void onMaxData() {
if (!oldWriteQueue_) {
for (auto id : connFlowControlBlocked_) {
auto stream = findStream(id);
if (stream) {
writeQueue().insertOrUpdate(
PriorityQueue::Identifier::fromStreamID(id), stream->priority);
}
}
connFlowControlBlocked_.clear();
}
}
private: private:
void updateAppIdleState(); void updateAppIdleState();
@ -872,13 +906,20 @@ class QuicStreamManager {
folly::F14FastMap<StreamId, ApplicationErrorCode> stopSendingStreams_; folly::F14FastMap<StreamId, ApplicationErrorCode> stopSendingStreams_;
folly::F14FastSet<StreamId> windowUpdates_; folly::F14FastSet<StreamId> windowUpdates_;
folly::F14FastSet<StreamId> flowControlUpdated_; folly::F14FastSet<StreamId> flowControlUpdated_;
// Streams that were removed from the write queue because they are blocked
// on connection flow control.
folly::F14FastSet<StreamId> connFlowControlBlocked_;
// Streams that have bytes in loss buffer
folly::F14FastSet<StreamId> lossStreams_; folly::F14FastSet<StreamId> lossStreams_;
folly::F14FastSet<StreamId> lossDSRStreams_; folly::F14FastSet<StreamId> lossDSRStreams_;
folly::F14FastSet<StreamId> readableStreams_; folly::F14FastSet<StreamId> readableStreams_;
folly::F14FastSet<StreamId> unidirectionalReadableStreams_; folly::F14FastSet<StreamId> unidirectionalReadableStreams_;
folly::F14FastSet<StreamId> peekableStreams_; folly::F14FastSet<StreamId> peekableStreams_;
deprecated::PriorityQueue writeQueue_; std::unique_ptr<PriorityQueue> writeQueue_;
std::unique_ptr<deprecated::PriorityQueue> oldWriteQueue_;
std::set<StreamId> controlWriteQueue_; std::set<StreamId> controlWriteQueue_;
folly::F14FastSet<StreamId> writableStreams_; folly::F14FastSet<StreamId> writableStreams_;
folly::F14FastSet<StreamId> writableDSRStreams_; folly::F14FastSet<StreamId> writableDSRStreams_;

View File

@ -524,10 +524,11 @@ struct QuicStreamState : public QuicStreamLike {
// If the stream has writable data that's not backed by DSR. That is, in a // If the stream has writable data that's not backed by DSR. That is, in a
// regular stream write, it will be able to write something. So it either // regular stream write, it will be able to write something. So it either
// needs to have data in the pendingWrites chain, or it has EOF to send. // needs to have data in the pendingWrites chain, or it has EOF to send.
bool hasWritableData() const { bool hasWritableData(bool connFlowControlOpen = true) const {
if (!pendingWrites.empty()) { if (!pendingWrites.empty()) {
CHECK_GE(flowControlState.peerAdvertisedMaxOffset, currentWriteOffset); CHECK_GE(flowControlState.peerAdvertisedMaxOffset, currentWriteOffset);
return flowControlState.peerAdvertisedMaxOffset - currentWriteOffset > 0; return connFlowControlOpen &&
flowControlState.peerAdvertisedMaxOffset - currentWriteOffset > 0;
} }
if (finalWriteOffset) { if (finalWriteOffset) {
// We can only write a FIN with a non-DSR stream frame if there's no // We can only write a FIN with a non-DSR stream frame if there's no
@ -539,21 +540,22 @@ struct QuicStreamState : public QuicStreamLike {
} }
// Whether this stream has non-DSR data in the write buffer or loss buffer. // Whether this stream has non-DSR data in the write buffer or loss buffer.
[[nodiscard]] bool hasSchedulableData() const { [[nodiscard]] bool hasSchedulableData(bool connFlowControlOpen = true) const {
return hasWritableData() || !lossBuffer.empty(); return hasWritableData(connFlowControlOpen) || !lossBuffer.empty();
} }
[[nodiscard]] bool hasSchedulableDsr() const { [[nodiscard]] bool hasSchedulableDsr(bool connFlowControlOpen = true) const {
return hasWritableBufMeta() || !lossBufMetas.empty(); return hasWritableBufMeta(connFlowControlOpen) || !lossBufMetas.empty();
} }
[[nodiscard]] bool hasWritableBufMeta() const { [[nodiscard]] bool hasWritableBufMeta(bool connFlowControlOpen = true) const {
if (writeBufMeta.offset == 0) { if (writeBufMeta.offset == 0) {
return false; return false;
} }
if (writeBufMeta.length > 0) { if (writeBufMeta.length > 0) {
CHECK_GE(flowControlState.peerAdvertisedMaxOffset, writeBufMeta.offset); CHECK_GE(flowControlState.peerAdvertisedMaxOffset, writeBufMeta.offset);
return flowControlState.peerAdvertisedMaxOffset - writeBufMeta.offset > 0; return connFlowControlOpen &&
flowControlState.peerAdvertisedMaxOffset - writeBufMeta.offset > 0;
} }
if (finalWriteOffset) { if (finalWriteOffset) {
return writeBufMeta.offset <= *finalWriteOffset; return writeBufMeta.offset <= *finalWriteOffset;

View File

@ -463,6 +463,9 @@ struct TransportSettings {
// Support "paused" requests which buffer on the server without streaming back // Support "paused" requests which buffer on the server without streaming back
// to the client. // to the client.
bool disablePausedPriority{false}; bool disablePausedPriority{false};
// Use the new priority queue and scheduling implementation
bool useNewPriorityQueue{false};
}; };
} // namespace quic } // namespace quic