1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-05 11:21:09 +03:00

Write control streams before other streams.`

Summary: For protocols like HTTP/3, lacking an actual priority scheme, it's a good idea to write control streams before non control streams. Implement this in a round robin fashion in the same way we do for other streams.

Reviewed By: afrind

Differential Revision: D18236010

fbshipit-source-id: faee9af7fff7736679bfea262ac18d677a7cbf78
This commit is contained in:
Matt Joras
2019-11-04 14:17:51 -08:00
committed by Facebook Github Bot
parent af58963fe4
commit 1c0794abd7
8 changed files with 143 additions and 18 deletions

View File

@@ -232,11 +232,12 @@ bool RetransmissionScheduler::hasPendingData() const {
StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn) StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
: conn_(conn) {} : conn_(conn) {}
void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { StreamId StreamFrameScheduler::writeStreamsHelper(
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); PacketBuilderInterface& builder,
MiddleStartingIterationWrapper wrapper( const std::set<StreamId>& writableStreams,
conn_.streamManager->writableStreams(), StreamId nextScheduledStream,
conn_.schedulingState.nextScheduledStream); uint64_t& connWritableBytes) {
MiddleStartingIterationWrapper wrapper(writableStreams, nextScheduledStream);
auto writableStreamItr = wrapper.cbegin(); auto writableStreamItr = wrapper.cbegin();
// This will write the stream frames in a round robin fashion ordered by // This will write the stream frames in a round robin fashion ordered by
// stream id. The iterator will wrap around the collection at the end, and we // stream id. The iterator will wrap around the collection at the end, and we
@@ -250,7 +251,36 @@ void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
break; break;
} }
} }
conn_.schedulingState.nextScheduledStream = *writableStreamItr; return *writableStreamItr;
}
void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
DCHECK(conn_.streamManager->hasWritable());
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
if (connWritableBytes == 0) {
return;
}
// Write the control streams first as a naive binary priority mechanism.
const auto& writableControlStreams =
conn_.streamManager->writableControlStreams();
if (!writableControlStreams.empty()) {
conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper(
builder,
writableControlStreams,
conn_.schedulingState.nextScheduledControlStream,
connWritableBytes);
}
if (connWritableBytes == 0) {
return;
}
const auto& writableStreams = conn_.streamManager->writableStreams();
if (!writableStreams.empty()) {
conn_.schedulingState.nextScheduledStream = writeStreamsHelper(
builder,
writableStreams,
conn_.schedulingState.nextScheduledStream,
connWritableBytes);
}
} // namespace quic } // namespace quic
bool StreamFrameScheduler::hasPendingData() const { bool StreamFrameScheduler::hasPendingData() const {

View File

@@ -159,6 +159,12 @@ class StreamFrameScheduler {
const MapType::key_type& start_; const MapType::key_type& start_;
}; };
StreamId writeStreamsHelper(
PacketBuilderInterface& builder,
const std::set<StreamId>& writableStreams,
StreamId nextScheduledStream,
uint64_t& connWritableBytes);
using WritableStreamItr = using WritableStreamItr =
MiddleStartingIterationWrapper::MiddleStartingIterator; MiddleStartingIterationWrapper::MiddleStartingIterator;

View File

@@ -762,6 +762,70 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3);
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
QuicClientConnectionState conn;
conn.streamManager->setMaxLocalBidirectionalStreams(10);
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
auto connId = getTestConnectionId();
StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader1(
ProtectionType::KeyPhaseZero,
connId,
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder1(
conn.udpSendPacketLen,
std::move(shortHeader1),
conn.ackStates.appDataAckState.largestAckedByPeer);
auto stream1 = conn.streamManager->createNextBidirectionalStream().value();
auto stream2 = conn.streamManager->createNextBidirectionalStream().value();
auto stream3 = conn.streamManager->createNextBidirectionalStream().value();
auto stream4 = conn.streamManager->createNextBidirectionalStream().value();
conn.streamManager->setStreamAsControl(*stream2);
conn.streamManager->setStreamAsControl(*stream4);
auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096);
auto curBuf = largeBuf.get();
do {
curBuf->append(curBuf->capacity());
curBuf = curBuf->next();
} while (curBuf != largeBuf.get());
auto chainLen = largeBuf->computeChainDataLength();
writeDataToQuicStream(*stream1, std::move(largeBuf), false);
writeDataToQuicStream(*stream2, folly::IOBuf::copyBuffer("some data"), false);
writeDataToQuicStream(*stream3, folly::IOBuf::copyBuffer("some data"), false);
writeDataToQuicStream(*stream4, folly::IOBuf::copyBuffer("some data"), false);
// Force the wraparound initially.
conn.schedulingState.nextScheduledStream = stream4->id + 8;
scheduler.writeStreams(builder1);
EXPECT_EQ(conn.schedulingState.nextScheduledStream, stream3->id);
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2->id);
// Should write frames for stream2, stream4, followed by stream 3 then 1.
MockQuicPacketBuilder builder2;
EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Return(4096));
EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) {
builder2.frames_.push_back(f);
}));
scheduler.writeStreams(builder2);
auto& frames = builder2.frames_;
ASSERT_EQ(frames.size(), 4);
WriteStreamFrame f1(stream2->id, 0, 9, false);
WriteStreamFrame f2(stream4->id, 0, 9, false);
WriteStreamFrame f3(stream3->id, 0, 9, false);
WriteStreamFrame f4(stream1->id, 0, chainLen, false);
ASSERT_TRUE(frames[0].asWriteStreamFrame());
EXPECT_EQ(*frames[0].asWriteStreamFrame(), f1);
ASSERT_TRUE(frames[1].asWriteStreamFrame());
EXPECT_EQ(*frames[1].asWriteStreamFrame(), f2);
ASSERT_TRUE(frames[2].asWriteStreamFrame());
EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3);
ASSERT_TRUE(frames[3].asWriteStreamFrame());
EXPECT_EQ(*frames[3].asWriteStreamFrame(), f4);
EXPECT_EQ(conn.schedulingState.nextScheduledStream, stream3->id);
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2->id);
}
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) { TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) {
QuicClientConnectionState conn; QuicClientConnectionState conn;
conn.streamManager->setMaxLocalBidirectionalStreams(10); conn.streamManager->setMaxLocalBidirectionalStreams(10);
@@ -808,7 +872,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) {
// Manually remove a stream and set the next scheduled to that stream. // Manually remove a stream and set the next scheduled to that stream.
builder.frames_.clear(); builder.frames_.clear();
conn.streamManager->removeWritable(stream2->id); conn.streamManager->removeWritable(*stream2);
conn.schedulingState.nextScheduledStream = stream2->id; conn.schedulingState.nextScheduledStream = stream2->id;
scheduler.writeStreams(builder); scheduler.writeStreams(builder);
ASSERT_EQ(builder.frames_.size(), 1); ASSERT_EQ(builder.frames_.size(), 1);

View File

@@ -1777,7 +1777,8 @@ TEST_F(QuicTransportFunctionsTest, HasAppDataToWrite) {
auto conn = createConn(); auto conn = createConn();
conn->flowControlState.peerAdvertisedMaxOffset = 1000; conn->flowControlState.peerAdvertisedMaxOffset = 1000;
conn->flowControlState.sumCurWriteOffset = 800; conn->flowControlState.sumCurWriteOffset = 800;
conn->streamManager->addWritable(0); QuicStreamState stream(0, *conn);
conn->streamManager->addWritable(stream);
EXPECT_EQ(WriteDataReason::NO_WRITE, hasNonAckDataToWrite(*conn)); EXPECT_EQ(WriteDataReason::NO_WRITE, hasNonAckDataToWrite(*conn));
conn->oneRttWriteCipher = test::createNoOpAead(); conn->oneRttWriteCipher = test::createNoOpAead();

View File

@@ -364,6 +364,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
readableStreams_.erase(streamId); readableStreams_.erase(streamId);
peekableStreams_.erase(streamId); peekableStreams_.erase(streamId);
writableStreams_.erase(streamId); writableStreams_.erase(streamId);
writableControlStreams_.erase(streamId);
blockedStreams_.erase(streamId); blockedStreams_.erase(streamId);
deliverableStreams_.erase(streamId); deliverableStreams_.erase(streamId);
windowUpdates_.erase(streamId); windowUpdates_.erase(streamId);
@@ -463,9 +464,9 @@ void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
if (stream.hasWritableData() && !stream.streamWriteError.hasValue()) { if (stream.hasWritableData() && !stream.streamWriteError.hasValue()) {
stream.conn.streamManager->addWritable(stream.id); stream.conn.streamManager->addWritable(stream);
} else { } else {
stream.conn.streamManager->removeWritable(stream.id); stream.conn.streamManager->removeWritable(stream);
} }
} }

View File

@@ -216,32 +216,50 @@ class QuicStreamManager {
return writableStreams_; return writableStreams_;
} }
// TODO figure out a better interface here.
/*
* Returns a mutable reference to the container holding the writable stream
* IDs.
*/
auto& writableControlStreams() {
return writableControlStreams_;
}
/* /*
* Returns if there are any writable streams. * Returns if there are any writable streams.
*/ */
bool hasWritable() const { bool hasWritable() const {
return !writableStreams_.empty(); return !writableStreams_.empty() || !writableControlStreams_.empty();
} }
/* /*
* Returns if the current writable streams contains the given id. * Returns if the current writable streams contains the given id.
*/ */
bool writableContains(StreamId streamId) const { bool writableContains(StreamId streamId) const {
return writableStreams_.count(streamId) > 0; return writableStreams_.count(streamId) > 0 ||
writableControlStreams_.count(streamId) > 0;
} }
/* /*
* Add a writable stream id. * Add a writable stream id.
*/ */
void addWritable(StreamId streamId) { void addWritable(const QuicStreamState& stream) {
writableStreams_.insert(streamId); if (stream.isControl) {
writableControlStreams_.insert(stream.id);
} else {
writableStreams_.insert(stream.id);
}
} }
/* /*
* Remove a writable stream id. * Remove a writable stream id.
*/ */
void removeWritable(StreamId streamId) { void removeWritable(const QuicStreamState& stream) {
writableStreams_.erase(streamId); if (stream.isControl) {
writableControlStreams_.erase(stream.id);
} else {
writableStreams_.erase(stream.id);
}
} }
/* /*
@@ -249,6 +267,7 @@ class QuicStreamManager {
*/ */
void clearWritable() { void clearWritable() {
writableStreams_.clear(); writableStreams_.clear();
writableControlStreams_.clear();
} }
/* /*
@@ -737,9 +756,12 @@ class QuicStreamManager {
// List of streams that have pending peeks // List of streams that have pending peeks
std::set<StreamId> peekableStreams_; std::set<StreamId> peekableStreams_;
// List of streams that have writable data // List of !control streams that have writable data
std::set<StreamId> writableStreams_; std::set<StreamId> writableStreams_;
// List of control streams that have writable data
std::set<StreamId> writableControlStreams_;
// List of streams that were blocked // List of streams that were blocked
std::unordered_map<StreamId, StreamDataBlockedFrame> blockedStreams_; std::unordered_map<StreamId, StreamDataBlockedFrame> blockedStreams_;

View File

@@ -676,6 +676,7 @@ struct QuicConnectionStateBase {
struct PacketSchedulingState { struct PacketSchedulingState {
StreamId nextScheduledStream{0}; StreamId nextScheduledStream{0};
StreamId nextScheduledControlStream{0};
}; };
PacketSchedulingState schedulingState; PacketSchedulingState schedulingState;

View File

@@ -1511,7 +1511,7 @@ TEST_F(QuicStreamFunctionsTest, RemovedClosedState) {
auto streamId = stream->id; auto streamId = stream->id;
conn.streamManager->readableStreams().emplace(streamId); conn.streamManager->readableStreams().emplace(streamId);
conn.streamManager->peekableStreams().emplace(streamId); conn.streamManager->peekableStreams().emplace(streamId);
conn.streamManager->addWritable(streamId); conn.streamManager->addWritable(*stream);
conn.streamManager->queueBlocked(streamId, 0); conn.streamManager->queueBlocked(streamId, 0);
conn.streamManager->addDeliverable(streamId); conn.streamManager->addDeliverable(streamId);
conn.streamManager->addLoss(streamId); conn.streamManager->addLoss(streamId);