1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Fix stream scheduling round robin.

Summary:
The intention here was always to write to streams in a round robin fashion. However, this functionality has been effectively broken since introduction as `lastScheduledStream` was never set. We can fix this by having the `StreamFrameScheduler` set `nextScheduledStream` after it has written to the streams. Additionally we need to remove a check that kept us from moving past a stream if it still had data left to write.

In extreme cases this would cause streams to be completely starved, and ruin concurrency.

Reviewed By: siyengar

Differential Revision: D17748652

fbshipit-source-id: a3d05c54ee7eaed4d858df9d89035fe8f252c727
This commit is contained in:
Matt Joras
2019-10-08 12:21:52 -07:00
committed by Facebook Github Bot
parent cff5937685
commit c0a0919113
5 changed files with 102 additions and 26 deletions

View File

@@ -32,7 +32,7 @@ folly::Optional<PacketNum> largestAckToSend(const AckState& ackState) {
// Schedulers // Schedulers
FrameScheduler::Builder::Builder( FrameScheduler::Builder::Builder(
const QuicConnectionStateBase& conn, QuicConnectionStateBase& conn,
EncryptionLevel encryptionLevel, EncryptionLevel encryptionLevel,
PacketNumberSpace packetNumberSpace, PacketNumberSpace packetNumberSpace,
std::string name) std::string name)
@@ -229,23 +229,29 @@ bool RetransmissionScheduler::hasPendingData() const {
return !conn_.streamManager->lossStreams().empty(); return !conn_.streamManager->lossStreams().empty();
} }
StreamFrameScheduler::StreamFrameScheduler(const QuicConnectionStateBase& conn) StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
: conn_(conn) {} : conn_(conn) {}
void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
MiddleStartingIterationWrapper wrapper( MiddleStartingIterationWrapper wrapper(
conn_.streamManager->writableStreams(), conn_.streamManager->writableStreams(),
conn_.schedulingState.lastScheduledStream); conn_.schedulingState.nextScheduledStream);
auto writableStreamItr = wrapper.cbegin(); auto writableStreamItr = wrapper.cbegin();
// This will write the stream frames in a round robin fashion ordered by
// stream id. The iterator will wrap around the collection at the end, and we
// keep track of the value at the next iteration. This allows us to start
// writing at the next stream when building the next packet.
// TODO experiment with writing streams with an actual prioritization scheme.
while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) { while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) {
auto res = if (writeNextStreamFrame(builder, *writableStreamItr, connWritableBytes)) {
writeNextStreamFrame(builder, writableStreamItr, connWritableBytes); writableStreamItr++;
if (!res) { } else {
break; break;
} }
} }
} conn_.schedulingState.nextScheduledStream = *writableStreamItr;
} // namespace quic
bool StreamFrameScheduler::hasPendingData() const { bool StreamFrameScheduler::hasPendingData() const {
return conn_.streamManager->hasWritable() && return conn_.streamManager->hasWritable() &&
@@ -254,12 +260,12 @@ bool StreamFrameScheduler::hasPendingData() const {
bool StreamFrameScheduler::writeNextStreamFrame( bool StreamFrameScheduler::writeNextStreamFrame(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
StreamFrameScheduler::WritableStreamItr& writableStreamItr, StreamId streamId,
uint64_t& connWritableBytes) { uint64_t& connWritableBytes) {
if (builder.remainingSpaceInPkt() == 0) { if (builder.remainingSpaceInPkt() == 0) {
return false; return false;
} }
auto stream = conn_.streamManager->findStream(*writableStreamItr); auto stream = conn_.streamManager->findStream(streamId);
CHECK(stream); CHECK(stream);
// hasWritableData is the condition which has to be satisfied for the // hasWritableData is the condition which has to be satisfied for the
@@ -288,15 +294,6 @@ bool StreamFrameScheduler::writeNextStreamFrame(
<< " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " " << " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " "
<< conn_; << conn_;
connWritableBytes -= dataLen.value(); connWritableBytes -= dataLen.value();
// bytesWritten < min(flowControlBytes, writeBuffer) means that we haven't
// written all writable bytes in this stream due to running out of room in the
// packet.
if (*dataLen ==
std::min<uint64_t>(
getSendStreamFlowControlBytesWire(*stream),
stream->writeBuffer.chainLength())) {
++writableStreamItr;
}
return true; return true;
} }

View File

@@ -70,7 +70,7 @@ class RetransmissionScheduler {
class StreamFrameScheduler { class StreamFrameScheduler {
public: public:
explicit StreamFrameScheduler(const QuicConnectionStateBase& conn); explicit StreamFrameScheduler(QuicConnectionStateBase& conn);
/** /**
* Return: the first boolean indicates if at least one Blocked frame * Return: the first boolean indicates if at least one Blocked frame
@@ -169,10 +169,10 @@ class StreamFrameScheduler {
*/ */
bool writeNextStreamFrame( bool writeNextStreamFrame(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
WritableStreamItr& writableStreamItr, StreamId streamId,
uint64_t& connWritableBytes); uint64_t& connWritableBytes);
const QuicConnectionStateBase& conn_; QuicConnectionStateBase& conn_;
}; };
class AckScheduler { class AckScheduler {
@@ -306,7 +306,7 @@ class FrameScheduler : public QuicPacketScheduler {
struct Builder { struct Builder {
Builder( Builder(
const QuicConnectionStateBase& conn, QuicConnectionStateBase& conn,
EncryptionLevel encryptionLevel, EncryptionLevel encryptionLevel,
PacketNumberSpace packetNumberSpace, PacketNumberSpace packetNumberSpace,
std::string name); std::string name);
@@ -323,7 +323,7 @@ class FrameScheduler : public QuicPacketScheduler {
FrameScheduler build() &&; FrameScheduler build() &&;
private: private:
const QuicConnectionStateBase& conn_; QuicConnectionStateBase& conn_;
EncryptionLevel encryptionLevel_; EncryptionLevel encryptionLevel_;
PacketNumberSpace packetNumberSpace_; PacketNumberSpace packetNumberSpace_;
std::string name_; std::string name_;

View File

@@ -13,8 +13,10 @@
#include <quic/api/test/Mocks.h> #include <quic/api/test/Mocks.h>
#include <quic/client/state/ClientStateMachine.h> #include <quic/client/state/ClientStateMachine.h>
#include <quic/codec/QuicPacketBuilder.h> #include <quic/codec/QuicPacketBuilder.h>
#include <quic/codec/test/Mocks.h>
#include <quic/common/test/TestUtils.h> #include <quic/common/test/TestUtils.h>
#include <quic/server/state/ServerStateMachine.h> #include <quic/server/state/ServerStateMachine.h>
#include <quic/state/QuicStreamFunctions.h>
using namespace quic; using namespace quic;
using namespace testing; using namespace testing;
@@ -682,5 +684,82 @@ TEST_F(QuicPacketSchedulerTest, LargestAckToSend) {
EXPECT_EQ(folly::none, largestAckToSend(conn.ackStates.appDataAckState)); EXPECT_EQ(folly::none, largestAckToSend(conn.ackStates.appDataAckState));
} }
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) {
QuicClientConnectionState conn;
conn.streamManager->setMaxLocalBidirectionalStreams(10);
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
auto connId = getTestConnectionId();
StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader(
ProtectionType::KeyPhaseZero,
connId,
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder(
conn.udpSendPacketLen,
std::move(shortHeader),
conn.ackStates.appDataAckState.largestAckedByPeer);
auto stream1 = conn.streamManager->createNextBidirectionalStream().value();
auto stream2 = conn.streamManager->createNextBidirectionalStream().value();
auto stream3 = conn.streamManager->createNextBidirectionalStream().value();
writeDataToQuicStream(*stream1, folly::IOBuf::copyBuffer("some data"), false);
writeDataToQuicStream(*stream2, folly::IOBuf::copyBuffer("some data"), false);
writeDataToQuicStream(*stream3, folly::IOBuf::copyBuffer("some data"), false);
scheduler.writeStreams(builder);
EXPECT_EQ(conn.schedulingState.nextScheduledStream, 0);
}
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
QuicClientConnectionState conn;
conn.streamManager->setMaxLocalBidirectionalStreams(10);
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
auto connId = getTestConnectionId();
StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader1(
ProtectionType::KeyPhaseZero,
connId,
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder1(
conn.udpSendPacketLen,
std::move(shortHeader1),
conn.ackStates.appDataAckState.largestAckedByPeer);
auto stream1 = conn.streamManager->createNextBidirectionalStream().value();
auto stream2 = conn.streamManager->createNextBidirectionalStream().value();
auto stream3 = conn.streamManager->createNextBidirectionalStream().value();
auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096);
auto curBuf = largeBuf.get();
do {
curBuf->append(curBuf->capacity());
curBuf = curBuf->next();
} while (curBuf != largeBuf.get());
auto chainLen = largeBuf->computeChainDataLength();
writeDataToQuicStream(*stream1, std::move(largeBuf), false);
writeDataToQuicStream(*stream2, folly::IOBuf::copyBuffer("some data"), false);
writeDataToQuicStream(*stream3, folly::IOBuf::copyBuffer("some data"), false);
scheduler.writeStreams(builder1);
EXPECT_EQ(conn.schedulingState.nextScheduledStream, 4);
// Should write frames for stream2, stream3, followed by stream1 again.
MockQuicPacketBuilder builder2;
EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Return(4096));
EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) {
builder2.frames_.push_back(f);
}));
scheduler.writeStreams(builder2);
auto& frames = builder2.frames_;
ASSERT_EQ(frames.size(), 3);
auto packet = builder2.frames_;
WriteStreamFrame f1(stream2->id, 0, 9, false);
WriteStreamFrame f2(stream3->id, 0, 9, false);
WriteStreamFrame f3(stream1->id, 0, chainLen, false);
ASSERT_TRUE(frames[0].asWriteStreamFrame());
EXPECT_EQ(*frames[0].asWriteStreamFrame(), f1);
ASSERT_TRUE(frames[1].asWriteStreamFrame());
EXPECT_EQ(*frames[1].asWriteStreamFrame(), f2);
ASSERT_TRUE(frames[2].asWriteStreamFrame());
EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3);
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -2338,7 +2338,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) {
conn.outstandingPackets.clear(); conn.outstandingPackets.clear();
// Start from stream2 instead of stream1 // Start from stream2 instead of stream1
conn.schedulingState.lastScheduledStream = s2; conn.schedulingState.nextScheduledStream = s2;
writableBytes = kDefaultUDPSendPacketLen - 100; writableBytes = kDefaultUDPSendPacketLen - 100;
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
@@ -2361,7 +2361,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) {
conn.outstandingPackets.clear(); conn.outstandingPackets.clear();
// Test wrap around // Test wrap around
conn.schedulingState.lastScheduledStream = s2; conn.schedulingState.nextScheduledStream = s2;
writableBytes = kDefaultUDPSendPacketLen; writableBytes = kDefaultUDPSendPacketLen;
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
writeQuicDataToSocket( writeQuicDataToSocket(

View File

@@ -615,7 +615,7 @@ struct QuicConnectionStateBase {
uint64_t udpSendPacketLen{kDefaultUDPSendPacketLen}; uint64_t udpSendPacketLen{kDefaultUDPSendPacketLen};
struct PacketSchedulingState { struct PacketSchedulingState {
StreamId lastScheduledStream{0}; StreamId nextScheduledStream{0};
}; };
PacketSchedulingState schedulingState; PacketSchedulingState schedulingState;