1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-01 01:44:22 +03:00

Use a single queue for scheduling DSR and non-DSR streams.

Summary:
The write loop functions for DSR or non-DSR are segmented today. As such, so are the schedulers. Mirroring this, we also currently store the DSR and non-DSR streams in separate write queues. This makes it impossible to effectively balance between the two without potential priority inversions or starvation.

Combining them into a single queue eliminates this possibility, but is not entirely straightforward.

The main difficulty comes from the schedulers. The `StreamFrameScheduler` for non-DSR data essentially loops over the control stream queue and the normal write queue looking for the next stream to write to a given packet. When the queues are segmented things are nice and easy. When they are combined, we have to deal with the potential that the non-DSR scheduler will hit a stream with only DSR data. Simply bailing isn't quite correct, since it will just cause an empty write loop. To fix that we need check, after we are finished writing a packet, if the next scheduled stream only has DSR data. If it does, we need to ensure `hasPendingData()` returns false.

The same needs to be done in reverse for the DSR stream scheduler.

The last major compication is that we need another loop which wraps the two individual write loop functions, and calls both functions until the packet limit is exhausted or there's no more data to write. This is to handle the case where there are, for example, two active streams with the same incremental priority, and one is DSR and the other is not. In this case each write loop we want to write `packetLimit` packets, flip flopping between DSR and non DSR packets.

This kind of round robining is pathologically bad for DSR, and a future diff will experiment with changing the round robin behavior such that we write a minimum number of packets per stream before moving on to the next stream.

This change also contains some other refactors, such as eliminating `updateLossStreams` from the stream manager.

(Note: this ignores all push blocking failures!)

Reviewed By: kvtsoy

Differential Revision: D46249067

fbshipit-source-id: 56a37c02fef51908c1336266ed40ac6d99bd14d4
This commit is contained in:
Matt Joras
2023-06-01 14:11:31 -07:00
committed by Facebook GitHub Bot
parent 5b5082a3ee
commit 35a2d34843
20 changed files with 401 additions and 159 deletions

View File

@ -458,7 +458,11 @@ void StreamFrameScheduler::writeStreamsHelper(
level.iterator->begin(); level.iterator->begin();
do { do {
auto streamId = level.iterator->current(); auto streamId = level.iterator->current();
auto stream = conn_.streamManager->findStream(streamId); auto stream = CHECK_NOTNULL(conn_.streamManager->findStream(streamId));
if (!stream->hasSchedulableData() && stream->hasSchedulableDsr()) {
// We hit a DSR stream
return;
}
CHECK(stream) << "streamId=" << streamId CHECK(stream) << "streamId=" << streamId
<< "inc=" << uint64_t(level.incremental); << "inc=" << uint64_t(level.incremental);
if (!writeSingleStream(builder, *stream, connWritableBytes)) { if (!writeSingleStream(builder, *stream, connWritableBytes)) {
@ -476,30 +480,40 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
DCHECK(conn_.streamManager->hasWritable()); DCHECK(conn_.streamManager->hasWritable());
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
// Write the control streams first as a naive binary priority mechanism. // Write the control streams first as a naive binary priority mechanism.
const auto& writableControlStreams = const auto& controlWriteQueue = conn_.streamManager->controlWriteQueue();
conn_.streamManager->writableControlStreams(); if (!controlWriteQueue.empty()) {
if (!writableControlStreams.empty()) {
conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper( conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper(
builder, builder,
writableControlStreams, controlWriteQueue,
conn_.schedulingState.nextScheduledControlStream, conn_.schedulingState.nextScheduledControlStream,
connWritableBytes, connWritableBytes,
conn_.transportSettings.streamFramePerPacket); conn_.transportSettings.streamFramePerPacket);
} }
auto& writableStreams = conn_.streamManager->writableStreams(); auto& writeQueue = conn_.streamManager->writeQueue();
if (!writableStreams.empty()) { if (!writeQueue.empty()) {
writeStreamsHelper( writeStreamsHelper(
builder, builder,
writableStreams, writeQueue,
connWritableBytes, connWritableBytes,
conn_.transportSettings.streamFramePerPacket); conn_.transportSettings.streamFramePerPacket);
// If the next non-control stream is DSR, record that fact in the scheduler
// so that we don't try to write a non DSR stream again. Note that this
// means that in the presence of many large control streams and DSR
// streams, we won't completely prioritize control streams but they
// will not be starved.
auto streamId = writeQueue.getNextScheduledStream();
auto stream = conn_.streamManager->findStream(streamId);
if (stream && !stream->hasSchedulableData()) {
nextStreamDsr_ = true;
}
} }
} // namespace quic }
bool StreamFrameScheduler::hasPendingData() const { bool StreamFrameScheduler::hasPendingData() const {
return conn_.streamManager->hasNonDSRLoss() || return !nextStreamDsr_ &&
(conn_.streamManager->hasNonDSRWritable() && (conn_.streamManager->hasNonDSRLoss() ||
getSendConnFlowControlBytesWire(conn_) > 0); (conn_.streamManager->hasNonDSRWritable() &&
getSendConnFlowControlBytesWire(conn_) > 0));
} }
bool StreamFrameScheduler::writeStreamFrame( bool StreamFrameScheduler::writeStreamFrame(

View File

@ -118,6 +118,7 @@ class StreamFrameScheduler {
uint64_t& connWritableBytes); uint64_t& connWritableBytes);
QuicConnectionStateBase& conn_; QuicConnectionStateBase& conn_;
bool nextStreamDsr_{false};
}; };
class AckScheduler { class AckScheduler {

View File

@ -675,7 +675,6 @@ void updateConnection(
stream->newStreamBytesSent += writeStreamFrame.len; stream->newStreamBytesSent += writeStreamFrame.len;
} }
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateLossStreams(*stream);
streamBytesSent += writeStreamFrame.len; streamBytesSent += writeStreamFrame.len;
detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten); detailsPerStream.addFrame(writeStreamFrame, newStreamDataWritten);
break; break;

View File

@ -1218,8 +1218,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) {
false); false);
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
kDefaultPriority),
0); 0);
} }
@ -1266,8 +1265,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
// Force the wraparound initially. // Force the wraparound initially.
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
kDefaultPriority),
4); 4);
// Should write frames for stream2, stream3, followed by stream1 again. // Should write frames for stream2, stream3, followed by stream1 again.
@ -1333,8 +1331,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) {
// The default is to wraparound initially. // The default is to wraparound initially.
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
kDefaultPriority),
stream2); stream2);
// Should write frames for stream2, stream3, followed by stream1 again. // Should write frames for stream2, stream3, followed by stream1 again.
@ -1361,6 +1358,97 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) {
EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3);
} }
TEST_F(
QuicPacketSchedulerTest,
StreamFrameSchedulerRoundRobinStreamPerPacketHitsDsr) {
QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build());
conn.streamManager->setMaxLocalBidirectionalStreams(10);
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
conn.transportSettings.streamFramePerPacket = true;
auto connId = getTestConnectionId();
StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader1(
ProtectionType::KeyPhaseZero,
connId,
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder1(
conn.udpSendPacketLen,
std::move(shortHeader1),
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
auto stream1 =
conn.streamManager->createNextBidirectionalStream().value()->id;
auto stream2 =
conn.streamManager->createNextBidirectionalStream().value()->id;
auto stream3 =
conn.streamManager->createNextBidirectionalStream().value()->id;
auto stream4 =
conn.streamManager->createNextBidirectionalStream().value()->id;
auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096);
auto curBuf = largeBuf.get();
do {
curBuf->append(curBuf->capacity());
curBuf = curBuf->next();
} while (curBuf != largeBuf.get());
writeDataToQuicStream(
*conn.streamManager->findStream(stream1), std::move(largeBuf), false);
writeDataToQuicStream(
*conn.streamManager->findStream(stream2),
folly::IOBuf::copyBuffer("some data"),
false);
writeDataToQuicStream(
*conn.streamManager->findStream(stream3),
folly::IOBuf::copyBuffer("some data"),
false);
auto sender = std::make_unique<MockDSRPacketizationRequestSender>();
ON_CALL(*sender, addSendInstruction(testing::_))
.WillByDefault(testing::Return(true));
ON_CALL(*sender, flush()).WillByDefault(testing::Return(true));
auto dsrStream = conn.streamManager->findStream(stream4);
dsrStream->dsrSender = std::move(sender);
BufferMeta bufMeta(20);
writeDataToQuicStream(
*conn.streamManager->findStream(stream4),
folly::IOBuf::copyBuffer("some data"),
false);
writeBufMetaToQuicStream(
*conn.streamManager->findStream(stream4), bufMeta, true /* eof */);
// Pretend we sent the non DSR data
dsrStream->ackedIntervals.insert(0, dsrStream->writeBuffer.chainLength() - 1);
dsrStream->currentWriteOffset = dsrStream->writeBuffer.chainLength();
dsrStream->writeBuffer.move();
conn.streamManager->updateWritableStreams(*dsrStream);
// The default is to wraparound initially.
scheduler.writeStreams(builder1);
EXPECT_EQ(
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
stream2);
// Should write frames for stream2, stream3, followed by an empty write.
NiceMock<MockQuicPacketBuilder> builder2;
EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Return(4096));
EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) {
builder2.frames_.push_back(f);
}));
auto& frames = builder2.frames_;
ASSERT_TRUE(scheduler.hasPendingData());
scheduler.writeStreams(builder2);
ASSERT_EQ(frames.size(), 1);
ASSERT_TRUE(scheduler.hasPendingData());
scheduler.writeStreams(builder2);
ASSERT_EQ(frames.size(), 2);
EXPECT_FALSE(scheduler.hasPendingData());
scheduler.writeStreams(builder2);
WriteStreamFrame f1(stream2, 0, 9, false);
WriteStreamFrame f2(stream3, 0, 9, false);
ASSERT_TRUE(frames[0].asWriteStreamFrame());
EXPECT_EQ(*frames[0].asWriteStreamFrame(), f1);
ASSERT_TRUE(frames[1].asWriteStreamFrame());
EXPECT_EQ(*frames[1].asWriteStreamFrame(), f2);
}
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) { TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
QuicClientConnectionState conn( QuicClientConnectionState conn(
FizzClientQuicHandshakeContext::Builder().build()); FizzClientQuicHandshakeContext::Builder().build());
@ -1406,7 +1494,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
// The default is to wraparound initially. // The default is to wraparound initially.
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(
Priority(0, false)), Priority(0, false)),
stream1); stream1);
@ -1473,7 +1561,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) {
// The default is to wraparound initially. // The default is to wraparound initially.
scheduler.writeStreams(builder1); scheduler.writeStreams(builder1);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(
Priority(0, false)), Priority(0, false)),
stream1); stream1);
@ -1550,8 +1638,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
// The default is to wraparound initially. // The default is to wraparound initially.
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
kDefaultPriority),
stream3); stream3);
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2);
@ -1578,8 +1665,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
EXPECT_EQ(*frames[3].asWriteStreamFrame(), f4); EXPECT_EQ(*frames[3].asWriteStreamFrame(), f4);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
kDefaultPriority),
stream3); stream3);
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2); EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2);
} }
@ -1605,8 +1691,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) {
writeDataToQuicStream(*stream1, folly::IOBuf::copyBuffer("some data"), false); writeDataToQuicStream(*stream1, folly::IOBuf::copyBuffer("some data"), false);
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
EXPECT_EQ( EXPECT_EQ(
conn.streamManager->writableStreams().getNextScheduledStream( conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
kDefaultPriority),
0); 0);
} }
@ -1644,7 +1729,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) {
// Manually remove a stream and set the next scheduled to that stream. // Manually remove a stream and set the next scheduled to that stream.
builder.frames_.clear(); builder.frames_.clear();
conn.streamManager->writableStreams().setNextScheduledStream(stream2); conn.streamManager->writeQueue().setNextScheduledStream(stream2);
conn.streamManager->removeWritable(*conn.streamManager->findStream(stream2)); conn.streamManager->removeWritable(*conn.streamManager->findStream(stream2));
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
ASSERT_EQ(builder.frames_.size(), 1); ASSERT_EQ(builder.frames_.size(), 1);
@ -1817,7 +1902,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) {
stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0]));
stream->retransmissionBuffer.clear(); stream->retransmissionBuffer.clear();
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateLossStreams(*stream);
EXPECT_TRUE(scheduler.hasPendingData()); EXPECT_TRUE(scheduler.hasPendingData());
// Write again // Write again
@ -1862,7 +1946,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) {
dsrStream->insertIntoLossBufMeta(bufMeta); dsrStream->insertIntoLossBufMeta(bufMeta);
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateWritableStreams(*dsrStream); conn.streamManager->updateWritableStreams(*dsrStream);
conn.streamManager->updateLossStreams(*dsrStream);
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
EXPECT_TRUE(scheduler.hasPendingData()); EXPECT_TRUE(scheduler.hasPendingData());
@ -1931,7 +2014,6 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0])); stream->lossBuffer.emplace_back(std::move(*stream->retransmissionBuffer[0]));
stream->retransmissionBuffer.clear(); stream->retransmissionBuffer.clear();
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateLossStreams(*stream);
EXPECT_TRUE(scheduler.hasPendingData()); EXPECT_TRUE(scheduler.hasPendingData());
// Write again // Write again

View File

@ -165,7 +165,6 @@ void dropPackets(QuicServerConnectionState& conn) {
std::move(*itr->second)); std::move(*itr->second));
stream->retransmissionBuffer.erase(itr); stream->retransmissionBuffer.erase(itr);
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateLossStreams(*stream);
} }
} }
conn.outstandings.reset(); conn.outstandings.reset();
@ -295,7 +294,6 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) {
} while (curBuf != largeBuf.get()); } while (curBuf != largeBuf.get());
lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false); lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false);
conn.streamManager->updateWritableStreams(*lossStreamState); conn.streamManager->updateWritableStreams(*lossStreamState);
conn.streamManager->updateLossStreams(*lossStreamState);
transport_->writeChain( transport_->writeChain(
stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);
@ -4262,7 +4260,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) {
conn.outstandings.reset(); conn.outstandings.reset();
// Start from stream2 instead of stream1 // Start from stream2 instead of stream1
conn.streamManager->writableStreams().setNextScheduledStream(s2); conn.streamManager->writeQueue().setNextScheduledStream(s2);
writableBytes = kDefaultUDPSendPacketLen - 100; writableBytes = kDefaultUDPSendPacketLen - 100;
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
@ -4287,7 +4285,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) {
conn.outstandings.reset(); conn.outstandings.reset();
// Test wrap around // Test wrap around
conn.streamManager->writableStreams().setNextScheduledStream(s2); conn.streamManager->writeQueue().setNextScheduledStream(s2);
writableBytes = kDefaultUDPSendPacketLen; writableBytes = kDefaultUDPSendPacketLen;
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
writeQuicDataToSocket( writeQuicDataToSocket(

View File

@ -731,8 +731,8 @@ void overridePacketWithToken(
} }
bool writableContains(QuicStreamManager& streamManager, StreamId streamId) { bool writableContains(QuicStreamManager& streamManager, StreamId streamId) {
return streamManager.writableStreams().count(streamId) > 0 || return streamManager.writeQueue().count(streamId) > 0 ||
streamManager.writableControlStreams().count(streamId) > 0; streamManager.controlWriteQueue().count(streamId) > 0;
} }
std::unique_ptr<PacketNumberCipher> std::unique_ptr<PacketNumberCipher>

View File

@ -140,6 +140,11 @@ TEST_F(DSRMultiWriteTest, TwoRequestsWithLoss) {
prepareFlowControlAndStreamLimit(); prepareFlowControlAndStreamLimit();
auto streamId = prepareOneStream(1000); auto streamId = prepareOneStream(1000);
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
// Pretend we sent the non DSR data
stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1);
stream->currentWriteOffset = stream->writeBuffer.chainLength();
stream->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream);
auto bufMetaStartingOffset = stream->writeBufMeta.offset; auto bufMetaStartingOffset = stream->writeBufMeta.offset;
// Move part of the BufMetas to lossBufMetas // Move part of the BufMetas to lossBufMetas
auto split = stream->writeBufMeta.split(500); auto split = stream->writeBufMeta.split(500);

View File

@ -18,9 +18,34 @@ DSRStreamFrameScheduler::DSRStreamFrameScheduler(
: conn_(conn) {} : conn_(conn) {}
bool DSRStreamFrameScheduler::hasPendingData() const { bool DSRStreamFrameScheduler::hasPendingData() const {
return conn_.streamManager->hasDSRLoss() || return !nextStreamNonDsr_ &&
(conn_.streamManager->hasDSRWritable() && (conn_.streamManager->hasDSRLoss() ||
getSendConnFlowControlBytesWire(conn_) > 0); (conn_.streamManager->hasDSRWritable() &&
getSendConnFlowControlBytesWire(conn_) > 0));
}
DSRStreamFrameScheduler::SchedulingResult
DSRStreamFrameScheduler::enrichAndAddSendInstruction(
uint32_t encodedSize,
DSRStreamFrameScheduler::SchedulingResult result,
DSRPacketBuilderBase& packetBuilder,
SendInstruction::Builder& instructionBuilder,
const PriorityQueue& writeQueue,
const PriorityQueue::LevelItr& levelIter,
QuicStreamState& stream) {
enrichInstruction(instructionBuilder, stream);
packetBuilder.addSendInstruction(
instructionBuilder.build(), encodedSize, stream.streamPacketIdx++);
result.writeSuccess = true;
result.sender = stream.dsrSender.get();
levelIter->iterator->next();
auto nextStreamId = writeQueue.getNextScheduledStream();
auto nextStream =
CHECK_NOTNULL(conn_.streamManager->findStream(nextStreamId));
if (nextStream->hasSchedulableData()) {
nextStreamNonDsr_ = true;
}
return result;
} }
/** /**
@ -32,22 +57,25 @@ bool DSRStreamFrameScheduler::hasPendingData() const {
DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream(
DSRPacketBuilderBase& builder) { DSRPacketBuilderBase& builder) {
SchedulingResult result; SchedulingResult result;
auto& writableDSRStreams = conn_.streamManager->writableDSRStreams(); auto& writeQueue = conn_.streamManager->writeQueue();
const auto& levelIter = std::find_if( const auto& levelIter = std::find_if(
writableDSRStreams.levels.cbegin(), writeQueue.levels.cbegin(),
writableDSRStreams.levels.cend(), writeQueue.levels.cend(),
[&](const auto& level) { return !level.empty(); }); [&](const auto& level) { return !level.empty(); });
if (levelIter == writableDSRStreams.levels.cend()) { if (levelIter == writeQueue.levels.cend()) {
return result; return result;
} }
levelIter->iterator->begin(); levelIter->iterator->begin();
auto streamId = levelIter->iterator->current(); auto streamId = levelIter->iterator->current();
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
CHECK(stream); CHECK(stream);
CHECK(stream->dsrSender); if (!stream->dsrSender || !stream->hasSchedulableDsr()) {
nextStreamNonDsr_ = true;
return result;
}
bool hasFreshBufMeta = stream->writeBufMeta.length > 0; bool hasFreshBufMeta = stream->writeBufMeta.length > 0;
bool hasLossBufMeta = !stream->lossBufMetas.empty(); bool hasLossBufMeta = !stream->lossBufMetas.empty();
CHECK(hasFreshBufMeta || hasLossBufMeta); CHECK(stream->hasSchedulableDsr());
if (hasLossBufMeta) { if (hasLossBufMeta) {
SendInstruction::Builder instructionBuilder(conn_, streamId); SendInstruction::Builder instructionBuilder(conn_, streamId);
auto encodedSize = writeDSRStreamFrame( auto encodedSize = writeDSRStreamFrame(
@ -64,15 +92,14 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream(
if (builder.remainingSpace() < encodedSize) { if (builder.remainingSpace() < encodedSize) {
return result; return result;
} }
enrichInstruction(instructionBuilder, *stream); return enrichAndAddSendInstruction(
builder.addSendInstruction( encodedSize,
instructionBuilder.build(), std::move(result),
/*streamEncodedSize=*/encodedSize, builder,
/*streamPacketIdx=*/stream->streamPacketIdx++); instructionBuilder,
result.writeSuccess = true; writeQueue,
result.sender = stream->dsrSender.get(); levelIter,
levelIter->iterator->next(); *stream);
return result;
} }
} }
if (!hasFreshBufMeta || builder.remainingSpace() == 0) { if (!hasFreshBufMeta || builder.remainingSpace() == 0) {
@ -109,13 +136,14 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream(
if (builder.remainingSpace() < encodedSize) { if (builder.remainingSpace() < encodedSize) {
return result; return result;
} }
enrichInstruction(instructionBuilder, *stream); return enrichAndAddSendInstruction(
builder.addSendInstruction( encodedSize,
instructionBuilder.build(), encodedSize, stream->streamPacketIdx++); std::move(result),
result.writeSuccess = true; builder,
result.sender = stream->dsrSender.get(); instructionBuilder,
levelIter->iterator->next(); writeQueue,
return result; levelIter,
*stream);
} }
return result; return result;
} }

View File

@ -37,8 +37,17 @@ class DSRStreamFrameScheduler {
void enrichInstruction( void enrichInstruction(
SendInstruction::Builder& builder, SendInstruction::Builder& builder,
const QuicStreamState& stream); const QuicStreamState& stream);
SchedulingResult enrichAndAddSendInstruction(
uint32_t,
SchedulingResult,
DSRPacketBuilderBase&,
SendInstruction::Builder&,
const PriorityQueue&,
const PriorityQueue::LevelItr&,
QuicStreamState&);
private: private:
QuicServerConnectionState& conn_; QuicServerConnectionState& conn_;
bool nextStreamNonDsr_{false};
}; };
} // namespace quic } // namespace quic

View File

@ -69,6 +69,11 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimit) {
auto streamId = prepareOneStream(3000); auto streamId = prepareOneStream(3000);
auto cid = getTestConnectionId(); auto cid = getTestConnectionId();
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
// Pretend we sent the non DSR data
stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1);
stream->currentWriteOffset = stream->writeBuffer.chainLength();
stream->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream);
auto currentBufMetaOffset = stream->writeBufMeta.offset; auto currentBufMetaOffset = stream->writeBufMeta.offset;
size_t packetLimit = 2; size_t packetLimit = 2;
conn_.lossState.srtt = 100ms; conn_.lossState.srtt = 100ms;
@ -96,6 +101,11 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimitNoLimit) {
auto streamId = prepareOneStream(3000); auto streamId = prepareOneStream(3000);
auto cid = getTestConnectionId(); auto cid = getTestConnectionId();
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
// Pretend we sent the non DSR data
stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1);
stream->currentWriteOffset = stream->writeBuffer.chainLength();
stream->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream);
auto currentBufMetaOffset = stream->writeBufMeta.offset; auto currentBufMetaOffset = stream->writeBufMeta.offset;
size_t packetLimit = 2; size_t packetLimit = 2;
conn_.lossState.srtt = 100ms; conn_.lossState.srtt = 100ms;
@ -123,6 +133,11 @@ TEST_F(WriteFunctionsTest, WriteTwoInstructions) {
prepareFlowControlAndStreamLimit(); prepareFlowControlAndStreamLimit();
auto streamId = prepareOneStream(2000); auto streamId = prepareOneStream(2000);
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
// Pretend we sent the non DSR data
stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1);
stream->currentWriteOffset = stream->writeBuffer.chainLength();
stream->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream);
auto cid = getTestConnectionId(); auto cid = getTestConnectionId();
size_t packetLimit = 20; size_t packetLimit = 20;
EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_));
@ -136,6 +151,11 @@ TEST_F(WriteFunctionsTest, PacketLimit) {
prepareFlowControlAndStreamLimit(); prepareFlowControlAndStreamLimit();
auto streamId = prepareOneStream(2000 * 100); auto streamId = prepareOneStream(2000 * 100);
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
// Pretend we sent the non DSR data
stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1);
stream->currentWriteOffset = stream->writeBuffer.chainLength();
stream->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream);
auto mockCongestionController = auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>(); std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get(); auto rawCongestionController = mockCongestionController.get();
@ -157,20 +177,51 @@ TEST_F(WriteFunctionsTest, WriteTwoStreams) {
auto streamId2 = prepareOneStream(1000); auto streamId2 = prepareOneStream(1000);
auto stream1 = conn_.streamManager->findStream(streamId1); auto stream1 = conn_.streamManager->findStream(streamId1);
auto stream2 = conn_.streamManager->findStream(streamId2); auto stream2 = conn_.streamManager->findStream(streamId2);
// Pretend we sent the non DSR data on second stream
stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1);
stream2->currentWriteOffset = stream2->writeBuffer.chainLength();
stream2->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream2);
auto cid = getTestConnectionId(); auto cid = getTestConnectionId();
size_t packetLimit = 20; size_t packetLimit = 20;
EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_));
EXPECT_EQ(1, stream1->retransmissionBufMetas.size()); EXPECT_EQ(1, stream1->retransmissionBufMetas.size());
EXPECT_EQ(1, stream2->retransmissionBufMetas.size()); EXPECT_EQ(1, stream2->retransmissionBufMetas.size());
// TODO: This needs to be fixed later: The stream and the sender needs to be
// 1:1 in the future. Then there will be two senders for this test case and
// each of them will send out one instruction.
EXPECT_EQ(1, countInstructions(streamId1)); EXPECT_EQ(1, countInstructions(streamId1));
EXPECT_EQ(1, countInstructions(streamId2)); EXPECT_EQ(1, countInstructions(streamId2));
EXPECT_EQ(2, conn_.outstandings.packets.size()); EXPECT_EQ(2, conn_.outstandings.packets.size());
EXPECT_TRUE(verifyAllOutstandingsAreDSR()); EXPECT_TRUE(verifyAllOutstandingsAreDSR());
} }
TEST_F(WriteFunctionsTest, WriteThreeStreamsNonDsrAndDsr) {
prepareFlowControlAndStreamLimit();
auto streamId1 = prepareOneStream(1000);
auto streamId2 = prepareOneStream(1000);
auto streamId3 = prepareOneStream(1000);
auto stream1 = conn_.streamManager->findStream(streamId1);
auto stream2 = conn_.streamManager->findStream(streamId2);
auto stream3 = conn_.streamManager->findStream(streamId3);
auto cid = getTestConnectionId();
size_t packetLimit = 20;
// First loop only write a single packet because it will find there's non-DSR
// data to write on the next stream.
EXPECT_EQ(1, writePacketizationRequest(conn_, cid, packetLimit, *aead_));
// Pretend we sent the non DSR data for last stream
stream3->ackedIntervals.insert(0, stream3->writeBuffer.chainLength() - 1);
stream3->currentWriteOffset = stream3->writeBuffer.chainLength();
stream3->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream3);
EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_));
EXPECT_EQ(1, stream1->retransmissionBufMetas.size());
EXPECT_EQ(1, stream2->retransmissionBufMetas.size());
EXPECT_EQ(1, stream3->retransmissionBufMetas.size());
EXPECT_EQ(1, countInstructions(streamId1));
EXPECT_EQ(1, countInstructions(streamId2));
EXPECT_EQ(1, countInstructions(streamId3));
EXPECT_EQ(3, conn_.outstandings.packets.size());
EXPECT_TRUE(verifyAllOutstandingsAreDSR());
}
TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) { TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) {
prepareFlowControlAndStreamLimit(); prepareFlowControlAndStreamLimit();
auto streamId1 = prepareOneStream(2000); auto streamId1 = prepareOneStream(2000);
@ -179,6 +230,11 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) {
auto stream2 = conn_.streamManager->findStream(streamId2); auto stream2 = conn_.streamManager->findStream(streamId2);
conn_.streamManager->setStreamPriority(streamId1, Priority{3, false}); conn_.streamManager->setStreamPriority(streamId1, Priority{3, false});
conn_.streamManager->setStreamPriority(streamId2, Priority{3, false}); conn_.streamManager->setStreamPriority(streamId2, Priority{3, false});
// Pretend we sent the non DSR data on first stream
stream1->ackedIntervals.insert(0, stream1->writeBuffer.chainLength() - 1);
stream1->currentWriteOffset = stream1->writeBuffer.chainLength();
stream1->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream1);
auto cid = getTestConnectionId(); auto cid = getTestConnectionId();
size_t packetLimit = 2; size_t packetLimit = 2;
EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_));
@ -198,6 +254,11 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsIncremental) {
auto stream2 = conn_.streamManager->findStream(streamId2); auto stream2 = conn_.streamManager->findStream(streamId2);
conn_.streamManager->setStreamPriority(streamId1, Priority{3, true}); conn_.streamManager->setStreamPriority(streamId1, Priority{3, true});
conn_.streamManager->setStreamPriority(streamId2, Priority{3, true}); conn_.streamManager->setStreamPriority(streamId2, Priority{3, true});
// Pretend we sent the non DSR data on second stream
stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1);
stream2->currentWriteOffset = stream2->writeBuffer.chainLength();
stream2->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream2);
auto cid = getTestConnectionId(); auto cid = getTestConnectionId();
size_t packetLimit = 2; size_t packetLimit = 2;
EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_));
@ -213,6 +274,11 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) {
prepareFlowControlAndStreamLimit(); prepareFlowControlAndStreamLimit();
auto streamId = prepareOneStream(1000); auto streamId = prepareOneStream(1000);
auto stream = conn_.streamManager->findStream(streamId); auto stream = conn_.streamManager->findStream(streamId);
// Pretend we sent the non DSR data
stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1);
stream->currentWriteOffset = stream->writeBuffer.chainLength();
stream->writeBuffer.move();
conn_.streamManager->updateWritableStreams(*stream);
auto bufMetaStartingOffset = stream->writeBufMeta.offset; auto bufMetaStartingOffset = stream->writeBufMeta.offset;
// Move part of the BufMetas to lossBufMetas // Move part of the BufMetas to lossBufMetas
auto split = stream->writeBufMeta.split(500); auto split = stream->writeBufMeta.split(500);
@ -246,7 +312,7 @@ TEST_F(
// Move part of the BufMetas to lossBufMetas // Move part of the BufMetas to lossBufMetas
auto split = stream->writeBufMeta.split(500); auto split = stream->writeBufMeta.split(500);
stream->lossBufMetas.push_back(split); stream->lossBufMetas.push_back(split);
conn_.streamManager->updateLossStreams(*stream); conn_.streamManager->updateWritableStreams(*stream);
// Zero out conn flow control. // Zero out conn flow control.
conn_.flowControlState.sumCurWriteOffset = conn_.flowControlState.sumCurWriteOffset =
conn_.flowControlState.peerAdvertisedMaxOffset; conn_.flowControlState.peerAdvertisedMaxOffset;

View File

@ -200,7 +200,6 @@ void markPacketLoss(
stream->retransmissionBufMetas.erase(retxBufMetaItr); stream->retransmissionBufMetas.erase(retxBufMetaItr);
} }
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateLossStreams(*stream);
break; break;
} }
case QuicWriteFrame::Type::WriteCryptoFrame: { case QuicWriteFrame::Type::WriteCryptoFrame: {

View File

@ -2348,7 +2348,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
ASSERT_EQ(200, retxBufMetaIter->second.length); ASSERT_EQ(200, retxBufMetaIter->second.length);
ASSERT_FALSE(retxBufMetaIter->second.eof); ASSERT_FALSE(retxBufMetaIter->second.eof);
conn->streamManager->updateWritableStreams(*stream); conn->streamManager->updateWritableStreams(*stream);
conn->streamManager->updateLossStreams(*stream);
EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->hasLoss());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty());
@ -2370,7 +2369,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
ASSERT_EQ(400, retxBufMetaIter->second.length); ASSERT_EQ(400, retxBufMetaIter->second.length);
ASSERT_FALSE(retxBufMetaIter->second.eof); ASSERT_FALSE(retxBufMetaIter->second.eof);
conn->streamManager->updateWritableStreams(*stream); conn->streamManager->updateWritableStreams(*stream);
conn->streamManager->updateLossStreams(*stream);
EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->hasLoss());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty());
@ -2392,7 +2390,6 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
ASSERT_EQ(400, retxBufMetaIter->second.length); ASSERT_EQ(400, retxBufMetaIter->second.length);
ASSERT_TRUE(retxBufMetaIter->second.eof); ASSERT_TRUE(retxBufMetaIter->second.eof);
conn->streamManager->updateWritableStreams(*stream); conn->streamManager->updateWritableStreams(*stream);
conn->streamManager->updateLossStreams(*stream);
EXPECT_FALSE(conn->streamManager->hasLoss()); EXPECT_FALSE(conn->streamManager->hasLoss());
EXPECT_TRUE(conn->streamManager->writableDSRStreams().empty()); EXPECT_TRUE(conn->streamManager->writableDSRStreams().empty());
@ -2411,7 +2408,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
EXPECT_EQ(2, stream->retransmissionBufMetas.size()); EXPECT_EQ(2, stream->retransmissionBufMetas.size());
EXPECT_TRUE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->hasLoss());
ASSERT_EQ(stream->streamLossCount, 1); ASSERT_EQ(stream->streamLossCount, 1);
EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); EXPECT_FALSE(stream->hasWritableBufMeta());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id));
EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id));
// Lose the 3rd dsr packet: // Lose the 3rd dsr packet:
RegularQuicWritePacket packet3(PacketHeader(ShortHeader( RegularQuicWritePacket packet3(PacketHeader(ShortHeader(
@ -2428,7 +2427,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
EXPECT_EQ(1, stream->retransmissionBufMetas.size()); EXPECT_EQ(1, stream->retransmissionBufMetas.size());
ASSERT_EQ(stream->streamLossCount, 2); ASSERT_EQ(stream->streamLossCount, 2);
EXPECT_TRUE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->hasLoss());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); EXPECT_FALSE(stream->hasWritableBufMeta());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id));
EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id));
// Lose the 3nd dsr packet, it should be merged together with the first // Lose the 3nd dsr packet, it should be merged together with the first
// element in the lossBufMetas: // element in the lossBufMetas:
@ -2446,7 +2447,9 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) {
EXPECT_EQ(0, stream->retransmissionBufMetas.size()); EXPECT_EQ(0, stream->retransmissionBufMetas.size());
ASSERT_EQ(stream->streamLossCount, 3); ASSERT_EQ(stream->streamLossCount, 3);
EXPECT_TRUE(conn->streamManager->hasLoss()); EXPECT_TRUE(conn->streamManager->hasLoss());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().empty()); EXPECT_FALSE(stream->hasWritableBufMeta());
EXPECT_FALSE(conn->streamManager->writableDSRStreams().contains(stream->id));
EXPECT_TRUE(conn->streamManager->writeQueue().count(stream->id));
} }
TEST_F(QuicLossFunctionsTest, TestReorderingThresholdDSRNormal) { TEST_F(QuicLossFunctionsTest, TestReorderingThresholdDSRNormal) {

View File

@ -325,13 +325,8 @@ void QuicServerTransport::writeData() {
} }
if (conn_->oneRttWriteCipher) { if (conn_->oneRttWriteCipher) {
CHECK(conn_->oneRttWriteHeaderCipher); CHECK(conn_->oneRttWriteHeaderCipher);
// This kind of sucks, but right now we don't have a way to schedule DSR
// and non-DSR streams as part of the same priority queue. This can lead
// to prioritity inversions and starving of one kind of stream over the
// other. To mitigate this, randomly decide whether to write DSR or non
// DSR data first which at least ensures fairness over time.
auto writeLoopBeginTime = Clock::now(); auto writeLoopBeginTime = Clock::now();
auto nonDsrPath = [&]() { auto nonDsrPath = [&](auto limit) {
return writeQuicDataToSocket( return writeQuicDataToSocket(
*socket_, *socket_,
*conn_, *conn_,
@ -340,33 +335,34 @@ void QuicServerTransport::writeData() {
*conn_->oneRttWriteCipher, *conn_->oneRttWriteCipher,
*conn_->oneRttWriteHeaderCipher, *conn_->oneRttWriteHeaderCipher,
version, version,
packetLimit, limit,
writeLoopBeginTime); writeLoopBeginTime);
}; };
auto dsrPath = [&]() { auto dsrPath = [&](auto limit) {
return writePacketizationRequest( return writePacketizationRequest(
*serverConn_, *serverConn_,
destConnId, destConnId,
packetLimit, limit,
*conn_->oneRttWriteCipher, *conn_->oneRttWriteCipher,
writeLoopBeginTime); writeLoopBeginTime);
}; };
if (folly::Random::oneIn(2)) { // We need a while loop because both paths write streams from the same
if (packetLimit && congestionControlWritableBytes(*serverConn_)) { // queue, which can result in empty writes.
packetLimit -= dsrPath(); while (packetLimit) {
} auto startingPacketLimit = packetLimit;
if (packetLimit) { // Give the non-DSR path a chance.
packetLimit -= nonDsrPath().packetsWritten; auto written = nonDsrPath(packetLimit);
}
} else {
auto written = nonDsrPath();
// If we didn't write much from the non DSR path, don't penalize DSR // If we didn't write much from the non DSR path, don't penalize DSR
// a full packet. // a full packet.
if (written.bytesWritten >= conn_->udpSendPacketLen / 2) { if (written.bytesWritten >= conn_->udpSendPacketLen / 2) {
packetLimit -= written.packetsWritten; packetLimit -= written.packetsWritten;
} }
if (packetLimit && congestionControlWritableBytes(*serverConn_)) { if (packetLimit && congestionControlWritableBytes(*serverConn_)) {
packetLimit -= dsrPath(); packetLimit -= dsrPath(packetLimit);
}
if (startingPacketLimit == packetLimit) {
// We haven't written anything with either path, so we're done.
break;
} }
} }
} }

View File

@ -170,7 +170,6 @@ AckEvent processAckFrame(
streamFrame->offset, streamFrame->len, streamFrame->fin); streamFrame->offset, streamFrame->len, streamFrame->fin);
stream->updateAckedIntervals( stream->updateAckedIntervals(
streamFrame->offset, streamFrame->len, streamFrame->fin); streamFrame->offset, streamFrame->len, streamFrame->fin);
conn.streamManager->updateLossStreams(*stream);
conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
} }
} }

View File

@ -157,6 +157,7 @@ struct PriorityQueue {
folly::F14FastMap<StreamId, OrderId> streamToOrderId; folly::F14FastMap<StreamId, OrderId> streamToOrderId;
}; };
std::vector<Level> levels; std::vector<Level> levels;
using LevelItr = decltype(levels)::const_iterator;
PriorityQueue() : levels(kDefaultPriorityLevelsSize) { PriorityQueue() : levels(kDefaultPriorityLevelsSize) {
for (size_t index = 0; index < levels.size(); index++) { for (size_t index = 0; index < levels.size(); index++) {
@ -258,6 +259,18 @@ struct PriorityQueue {
return level.iterator->nextStreamIt->streamId; return level.iterator->nextStreamIt->streamId;
} }
FOLLY_NODISCARD StreamId getNextScheduledStream() const {
const auto& levelIter =
std::find_if(levels.cbegin(), levels.cend(), [&](const auto& level) {
return !level.empty();
});
// The expectation is that calling this function on an empty queue is
// a bug.
CHECK(levelIter != levels.cend());
levelIter->iterator->begin();
return levelIter->iterator->current();
}
private: private:
folly::F14FastMap<StreamId, uint8_t> writableStreamsToLevel_; folly::F14FastMap<StreamId, uint8_t> writableStreamsToLevel_;
using WSIterator = decltype(writableStreamsToLevel_)::iterator; using WSIterator = decltype(writableStreamsToLevel_)::iterator;

View File

@ -246,10 +246,7 @@ bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) {
} }
notifyStreamPriorityChanges(); notifyStreamPriorityChanges();
} }
// If this stream is already in the writable or loss queues, update the writeQueue_.updateIfExist(id, stream->priority);
// priority there.
writableStreams_.updateIfExist(id, stream->priority);
writableDSRStreams_.updateIfExist(id, stream->priority);
return true; return true;
} }
return false; return false;
@ -560,10 +557,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
DCHECK(it->second.inTerminalStates()); DCHECK(it->second.inTerminalStates());
readableStreams_.erase(streamId); readableStreams_.erase(streamId);
peekableStreams_.erase(streamId); peekableStreams_.erase(streamId);
writableStreams_.erase(streamId); removeWritable(it->second);
writableDSRStreams_.erase(streamId);
writableControlStreams_.erase(streamId);
removeLoss(streamId);
blockedStreams_.erase(streamId); blockedStreams_.erase(streamId);
deliverableStreams_.erase(streamId); deliverableStreams_.erase(streamId);
txStreams_.erase(streamId); txStreams_.erase(streamId);
@ -644,22 +638,49 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
CHECK(stream.lossBuffer.empty()); CHECK(stream.lossBuffer.empty());
CHECK(stream.lossBufMetas.empty()); CHECK(stream.lossBufMetas.empty());
removeWritable(stream); removeWritable(stream);
removeDSRWritable(stream); writableStreams_.erase(stream.id);
writableDSRStreams_.erase(stream.id);
lossStreams_.erase(stream.id);
lossDSRStreams_.erase(stream.id);
if (stream.isControl) {
controlWriteQueue_.erase(stream.id);
} else {
writeQueue_.erase(stream.id);
}
return; return;
} }
if (stream.hasWritableData() || !stream.lossBuffer.empty()) { if (stream.hasWritableData()) {
addWritable(stream); writableStreams_.emplace(stream.id);
} else { } else {
removeWritable(stream); writableStreams_.erase(stream.id);
} }
if (stream.isControl) { if (stream.hasWritableBufMeta()) {
return; writableDSRStreams_.emplace(stream.id);
}
if (stream.dsrSender &&
(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty())) {
addDSRWritable(stream);
} else { } else {
removeDSRWritable(stream); writableDSRStreams_.erase(stream.id);
}
if (!stream.lossBuffer.empty()) {
lossStreams_.emplace(stream.id);
} else {
lossStreams_.erase(stream.id);
}
if (!stream.lossBufMetas.empty()) {
lossDSRStreams_.emplace(stream.id);
} else {
lossDSRStreams_.erase(stream.id);
}
if (stream.hasSchedulableData() || stream.hasSchedulableDsr()) {
if (stream.isControl) {
controlWriteQueue_.emplace(stream.id);
} else {
writeQueue_.insertOrUpdate(stream.id, stream.priority);
}
} else {
if (stream.isControl) {
controlWriteQueue_.erase(stream.id);
} else {
writeQueue_.erase(stream.id);
}
} }
} }

View File

@ -130,9 +130,10 @@ class QuicStreamManager {
lossDSRStreams_ = std::move(other.lossDSRStreams_); lossDSRStreams_ = std::move(other.lossDSRStreams_);
readableStreams_ = std::move(other.readableStreams_); readableStreams_ = std::move(other.readableStreams_);
peekableStreams_ = std::move(other.peekableStreams_); peekableStreams_ = std::move(other.peekableStreams_);
writeQueue_ = std::move(other.writeQueue_);
controlWriteQueue_ = std::move(other.controlWriteQueue_);
writableStreams_ = std::move(other.writableStreams_); writableStreams_ = std::move(other.writableStreams_);
writableDSRStreams_ = std::move(other.writableDSRStreams_); writableDSRStreams_ = std::move(other.writableDSRStreams_);
writableControlStreams_ = std::move(other.writableControlStreams_);
txStreams_ = std::move(other.txStreams_); txStreams_ = std::move(other.txStreams_);
deliverableStreams_ = std::move(other.deliverableStreams_); deliverableStreams_ = std::move(other.deliverableStreams_);
closedStreams_ = std::move(other.closedStreams_); closedStreams_ = std::move(other.closedStreams_);
@ -385,19 +386,6 @@ class QuicStreamManager {
lossStreams_.insert(id); lossStreams_.insert(id);
} }
void updateLossStreams(const QuicStreamState& stream) {
if (!stream.hasLoss()) {
removeLoss(stream.id);
} else {
if (!stream.lossBuffer.empty()) {
lossStreams_.emplace(stream.id);
}
if (!stream.lossBufMetas.empty()) {
lossDSRStreams_.emplace(stream.id);
}
}
}
/** /**
* Update stream priority if the stream indicated by id exists, and the * Update stream priority if the stream indicated by id exists, and the
* passed in values are different from current priority. Return true if * passed in values are different from current priority. Return true if
@ -423,16 +411,19 @@ class QuicStreamManager {
* Returns a mutable reference to the container holding the writable stream * Returns a mutable reference to the container holding the writable stream
* IDs. * IDs.
*/ */
auto& writableControlStreams() { auto& controlWriteQueue() {
return writableControlStreams_; return controlWriteQueue_;
}
auto& writeQueue() {
return writeQueue_;
} }
/* /*
* Returns if there are any writable streams. * Returns if there are any writable streams.
*/ */
bool hasWritable() const { bool hasWritable() const {
return !writableStreams_.empty() || !writableDSRStreams_.empty() || return !writeQueue_.empty() || !controlWriteQueue_.empty();
!writableControlStreams_.empty();
} }
FOLLY_NODISCARD bool hasDSRWritable() const { FOLLY_NODISCARD bool hasDSRWritable() const {
@ -440,7 +431,7 @@ class QuicStreamManager {
} }
bool hasNonDSRWritable() const { bool hasNonDSRWritable() const {
return !writableStreams_.empty() || !writableControlStreams_.empty(); return !writableStreams_.empty() || !controlWriteQueue_.empty();
} }
/* /*
@ -448,17 +439,26 @@ class QuicStreamManager {
*/ */
void addWritable(const QuicStreamState& stream) { void addWritable(const QuicStreamState& stream) {
if (stream.isControl) { if (stream.isControl) {
writableControlStreams_.insert(stream.id); // Control streams get their own queue.
CHECK(stream.hasSchedulableData());
controlWriteQueue_.insert(stream.id);
} else { } else {
CHECK(stream.hasWritableData() || !stream.lossBuffer.empty()); CHECK(stream.hasSchedulableData() || stream.hasSchedulableDsr());
writableStreams_.insertOrUpdate(stream.id, stream.priority); writeQueue_.insertOrUpdate(stream.id, stream.priority);
}
if (stream.hasWritableData()) {
writableStreams_.insert(stream.id);
}
if (stream.hasWritableBufMeta()) {
LOG(ERROR) << "writable DSR: " << stream.id;
writableDSRStreams_.insert(stream.id);
}
if (!stream.lossBuffer.empty()) {
lossStreams_.insert(stream.id);
}
if (!stream.lossBufMetas.empty()) {
lossDSRStreams_.insert(stream.id);
} }
}
void addDSRWritable(const QuicStreamState& stream) {
CHECK(!stream.isControl);
CHECK(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty());
writableDSRStreams_.insertOrUpdate(stream.id, stream.priority);
} }
/* /*
@ -466,15 +466,14 @@ class QuicStreamManager {
*/ */
void removeWritable(const QuicStreamState& stream) { void removeWritable(const QuicStreamState& stream) {
if (stream.isControl) { if (stream.isControl) {
writableControlStreams_.erase(stream.id); controlWriteQueue_.erase(stream.id);
} else { } else {
writableStreams_.erase(stream.id); writeQueue_.erase(stream.id);
} }
} writableStreams_.erase(stream.id);
void removeDSRWritable(const QuicStreamState& stream) {
CHECK(!stream.isControl);
writableDSRStreams_.erase(stream.id); writableDSRStreams_.erase(stream.id);
lossStreams_.erase(stream.id);
lossDSRStreams_.erase(stream.id);
} }
/* /*
@ -483,7 +482,8 @@ class QuicStreamManager {
void clearWritable() { void clearWritable() {
writableStreams_.clear(); writableStreams_.clear();
writableDSRStreams_.clear(); writableDSRStreams_.clear();
writableControlStreams_.clear(); writeQueue_.clear();
controlWriteQueue_.clear();
} }
/* /*
@ -1166,12 +1166,14 @@ class QuicStreamManager {
// Set of streams that have pending peeks // Set of streams that have pending peeks
folly::F14FastSet<StreamId> peekableStreams_; folly::F14FastSet<StreamId> peekableStreams_;
// Set of !control streams that have writable data // Set of !control streams that have writable data used for frame scheduling
PriorityQueue writableStreams_; PriorityQueue writeQueue_;
PriorityQueue writableDSRStreams_;
// Set of control streams that have writable data // Set of control streams that have writable data
std::set<StreamId> writableControlStreams_; std::set<StreamId> controlWriteQueue_;
folly::F14FastSet<StreamId> writableStreams_;
folly::F14FastSet<StreamId> writableDSRStreams_;
// Streams that may be able to call TxCallback // Streams that may be able to call TxCallback
folly::F14FastSet<StreamId> txStreams_; folly::F14FastSet<StreamId> txStreams_;

View File

@ -411,6 +411,15 @@ struct QuicStreamState : public QuicStreamLike {
return false; return false;
} }
// Whether this stream has non-DSR data in the write buffer or loss buffer.
FOLLY_NODISCARD bool hasSchedulableData() const {
return hasWritableData() || !lossBuffer.empty();
}
FOLLY_NODISCARD bool hasSchedulableDsr() const {
return hasWritableBufMeta() || !lossBufMetas.empty();
}
FOLLY_NODISCARD bool hasWritableBufMeta() const { FOLLY_NODISCARD bool hasWritableBufMeta() const {
if (writeBufMeta.offset == 0) { if (writeBufMeta.offset == 0) {
return false; return false;

View File

@ -27,7 +27,6 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) {
} }
stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updateWritableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream);
stream.conn.streamManager->updateLossStreams(stream);
} }
void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) { void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {
@ -63,7 +62,6 @@ void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {
} }
stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updateWritableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream);
stream.conn.streamManager->updateLossStreams(stream);
QUIC_STATS(stream.conn.statsCallback, onQuicStreamReset, frame.errorCode); QUIC_STATS(stream.conn.statsCallback, onQuicStreamReset, frame.errorCode);
} }

View File

@ -2236,14 +2236,14 @@ TEST_F(QuicStreamFunctionsTest, LossBufferEmpty) {
StreamId id = 4; StreamId id = 4;
QuicStreamState stream(id, conn); QuicStreamState stream(id, conn);
conn.streamManager->addLoss(id); conn.streamManager->addLoss(id);
conn.streamManager->updateLossStreams(stream); conn.streamManager->updateWritableStreams(stream);
EXPECT_FALSE(conn.streamManager->hasLoss()); EXPECT_FALSE(conn.streamManager->hasLoss());
} }
TEST_F(QuicStreamFunctionsTest, LossBufferEmptyNoChange) { TEST_F(QuicStreamFunctionsTest, LossBufferEmptyNoChange) {
StreamId id = 4; StreamId id = 4;
QuicStreamState stream(id, conn); QuicStreamState stream(id, conn);
conn.streamManager->updateLossStreams(stream); conn.streamManager->updateWritableStreams(stream);
EXPECT_FALSE(conn.streamManager->hasLoss()); EXPECT_FALSE(conn.streamManager->hasLoss());
} }
@ -2251,7 +2251,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferHasData) {
StreamId id = 4; StreamId id = 4;
QuicStreamState stream(id, conn); QuicStreamState stream(id, conn);
stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false);
conn.streamManager->updateLossStreams(stream); conn.streamManager->updateWritableStreams(stream);
EXPECT_TRUE(conn.streamManager->hasLoss()); EXPECT_TRUE(conn.streamManager->hasLoss());
} }
@ -2263,7 +2263,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferMetaHasData) {
b.setOffset(10); b.setOffset(10);
b.setEOF(false); b.setEOF(false);
stream.lossBufMetas.emplace_back(b.build()); stream.lossBufMetas.emplace_back(b.build());
conn.streamManager->updateLossStreams(stream); conn.streamManager->updateWritableStreams(stream);
EXPECT_TRUE(conn.streamManager->hasLoss()); EXPECT_TRUE(conn.streamManager->hasLoss());
} }
@ -2276,7 +2276,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferMetaStillHasData) {
b.setOffset(10); b.setOffset(10);
b.setEOF(false); b.setEOF(false);
stream.lossBufMetas.emplace_back(b.build()); stream.lossBufMetas.emplace_back(b.build());
conn.streamManager->updateLossStreams(stream); conn.streamManager->updateWritableStreams(stream);
EXPECT_TRUE(conn.streamManager->hasLoss()); EXPECT_TRUE(conn.streamManager->hasLoss());
} }
@ -2285,7 +2285,7 @@ TEST_F(QuicStreamFunctionsTest, LossBufferStillHasData) {
QuicStreamState stream(id, conn); QuicStreamState stream(id, conn);
conn.streamManager->addLoss(id); conn.streamManager->addLoss(id);
stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false);
conn.streamManager->updateLossStreams(stream); conn.streamManager->updateWritableStreams(stream);
EXPECT_TRUE(conn.streamManager->hasLoss()); EXPECT_TRUE(conn.streamManager->hasLoss());
} }