diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index 328572187..38b099d79 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -32,7 +32,7 @@ folly::Optional largestAckToSend(const AckState& ackState) { // Schedulers FrameScheduler::Builder::Builder( - const QuicConnectionStateBase& conn, + QuicConnectionStateBase& conn, EncryptionLevel encryptionLevel, PacketNumberSpace packetNumberSpace, std::string name) @@ -229,23 +229,29 @@ bool RetransmissionScheduler::hasPendingData() const { return !conn_.streamManager->lossStreams().empty(); } -StreamFrameScheduler::StreamFrameScheduler(const QuicConnectionStateBase& conn) +StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn) : conn_(conn) {} void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) { uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); MiddleStartingIterationWrapper wrapper( conn_.streamManager->writableStreams(), - conn_.schedulingState.lastScheduledStream); + conn_.schedulingState.nextScheduledStream); auto writableStreamItr = wrapper.cbegin(); + // This will write the stream frames in a round robin fashion ordered by + // stream id. The iterator will wrap around the collection at the end, and we + // keep track of the value at the next iteration. This allows us to start + // writing at the next stream when building the next packet. + // TODO experiment with writing streams with an actual prioritization scheme. while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) { - auto res = - writeNextStreamFrame(builder, writableStreamItr, connWritableBytes); - if (!res) { + if (writeNextStreamFrame(builder, *writableStreamItr, connWritableBytes)) { + writableStreamItr++; + } else { break; } } -} + conn_.schedulingState.nextScheduledStream = *writableStreamItr; +} // namespace quic bool StreamFrameScheduler::hasPendingData() const { return conn_.streamManager->hasWritable() && @@ -254,12 +260,12 @@ bool StreamFrameScheduler::hasPendingData() const { bool StreamFrameScheduler::writeNextStreamFrame( PacketBuilderInterface& builder, - StreamFrameScheduler::WritableStreamItr& writableStreamItr, + StreamId streamId, uint64_t& connWritableBytes) { if (builder.remainingSpaceInPkt() == 0) { return false; } - auto stream = conn_.streamManager->findStream(*writableStreamItr); + auto stream = conn_.streamManager->findStream(streamId); CHECK(stream); // hasWritableData is the condition which has to be satisfied for the @@ -288,15 +294,6 @@ bool StreamFrameScheduler::writeNextStreamFrame( << " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " " << conn_; connWritableBytes -= dataLen.value(); - // bytesWritten < min(flowControlBytes, writeBuffer) means that we haven't - // written all writable bytes in this stream due to running out of room in the - // packet. - if (*dataLen == - std::min( - getSendStreamFlowControlBytesWire(*stream), - stream->writeBuffer.chainLength())) { - ++writableStreamItr; - } return true; } diff --git a/quic/api/QuicPacketScheduler.h b/quic/api/QuicPacketScheduler.h index 86256281d..3006dabfa 100644 --- a/quic/api/QuicPacketScheduler.h +++ b/quic/api/QuicPacketScheduler.h @@ -70,7 +70,7 @@ class RetransmissionScheduler { class StreamFrameScheduler { public: - explicit StreamFrameScheduler(const QuicConnectionStateBase& conn); + explicit StreamFrameScheduler(QuicConnectionStateBase& conn); /** * Return: the first boolean indicates if at least one Blocked frame @@ -169,10 +169,10 @@ class StreamFrameScheduler { */ bool writeNextStreamFrame( PacketBuilderInterface& builder, - WritableStreamItr& writableStreamItr, + StreamId streamId, uint64_t& connWritableBytes); - const QuicConnectionStateBase& conn_; + QuicConnectionStateBase& conn_; }; class AckScheduler { @@ -306,7 +306,7 @@ class FrameScheduler : public QuicPacketScheduler { struct Builder { Builder( - const QuicConnectionStateBase& conn, + QuicConnectionStateBase& conn, EncryptionLevel encryptionLevel, PacketNumberSpace packetNumberSpace, std::string name); @@ -323,7 +323,7 @@ class FrameScheduler : public QuicPacketScheduler { FrameScheduler build() &&; private: - const QuicConnectionStateBase& conn_; + QuicConnectionStateBase& conn_; EncryptionLevel encryptionLevel_; PacketNumberSpace packetNumberSpace_; std::string name_; diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index bb331d669..467eec586 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -13,8 +13,10 @@ #include #include #include +#include #include #include +#include using namespace quic; using namespace testing; @@ -682,5 +684,82 @@ TEST_F(QuicPacketSchedulerTest, LargestAckToSend) { EXPECT_EQ(folly::none, largestAckToSend(conn.ackStates.appDataAckState)); } +TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) { + QuicClientConnectionState conn; + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + auto connId = getTestConnectionId(); + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader( + ProtectionType::KeyPhaseZero, + connId, + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder( + conn.udpSendPacketLen, + std::move(shortHeader), + conn.ackStates.appDataAckState.largestAckedByPeer); + auto stream1 = conn.streamManager->createNextBidirectionalStream().value(); + auto stream2 = conn.streamManager->createNextBidirectionalStream().value(); + auto stream3 = conn.streamManager->createNextBidirectionalStream().value(); + writeDataToQuicStream(*stream1, folly::IOBuf::copyBuffer("some data"), false); + writeDataToQuicStream(*stream2, folly::IOBuf::copyBuffer("some data"), false); + writeDataToQuicStream(*stream3, folly::IOBuf::copyBuffer("some data"), false); + scheduler.writeStreams(builder); + EXPECT_EQ(conn.schedulingState.nextScheduledStream, 0); +} + +TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) { + QuicClientConnectionState conn; + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + auto connId = getTestConnectionId(); + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader1( + ProtectionType::KeyPhaseZero, + connId, + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder1( + conn.udpSendPacketLen, + std::move(shortHeader1), + conn.ackStates.appDataAckState.largestAckedByPeer); + auto stream1 = conn.streamManager->createNextBidirectionalStream().value(); + auto stream2 = conn.streamManager->createNextBidirectionalStream().value(); + auto stream3 = conn.streamManager->createNextBidirectionalStream().value(); + auto largeBuf = folly::IOBuf::createChain(conn.udpSendPacketLen * 2, 4096); + auto curBuf = largeBuf.get(); + do { + curBuf->append(curBuf->capacity()); + curBuf = curBuf->next(); + } while (curBuf != largeBuf.get()); + auto chainLen = largeBuf->computeChainDataLength(); + writeDataToQuicStream(*stream1, std::move(largeBuf), false); + writeDataToQuicStream(*stream2, folly::IOBuf::copyBuffer("some data"), false); + writeDataToQuicStream(*stream3, folly::IOBuf::copyBuffer("some data"), false); + scheduler.writeStreams(builder1); + EXPECT_EQ(conn.schedulingState.nextScheduledStream, 4); + + // Should write frames for stream2, stream3, followed by stream1 again. + MockQuicPacketBuilder builder2; + EXPECT_CALL(builder2, remainingSpaceInPkt()).WillRepeatedly(Return(4096)); + EXPECT_CALL(builder2, appendFrame(_)).WillRepeatedly(Invoke([&](auto f) { + builder2.frames_.push_back(f); + })); + scheduler.writeStreams(builder2); + auto& frames = builder2.frames_; + ASSERT_EQ(frames.size(), 3); + auto packet = builder2.frames_; + WriteStreamFrame f1(stream2->id, 0, 9, false); + WriteStreamFrame f2(stream3->id, 0, 9, false); + WriteStreamFrame f3(stream1->id, 0, chainLen, false); + ASSERT_TRUE(frames[0].asWriteStreamFrame()); + EXPECT_EQ(*frames[0].asWriteStreamFrame(), f1); + ASSERT_TRUE(frames[1].asWriteStreamFrame()); + EXPECT_EQ(*frames[1].asWriteStreamFrame(), f2); + ASSERT_TRUE(frames[2].asWriteStreamFrame()); + EXPECT_EQ(*frames[2].asWriteStreamFrame(), f3); +} + } // namespace test } // namespace quic diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 7480ea4fb..220b1eb50 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -2338,7 +2338,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) { conn.outstandingPackets.clear(); // Start from stream2 instead of stream1 - conn.schedulingState.lastScheduledStream = s2; + conn.schedulingState.nextScheduledStream = s2; writableBytes = kDefaultUDPSendPacketLen - 100; EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); @@ -2361,7 +2361,7 @@ TEST_F(QuicTransportTest, WriteStreamFromMiddleOfMap) { conn.outstandingPackets.clear(); // Test wrap around - conn.schedulingState.lastScheduledStream = s2; + conn.schedulingState.nextScheduledStream = s2; writableBytes = kDefaultUDPSendPacketLen; EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); writeQuicDataToSocket( diff --git a/quic/state/StateData.h b/quic/state/StateData.h index 27a602e72..4e6e00a8d 100644 --- a/quic/state/StateData.h +++ b/quic/state/StateData.h @@ -615,7 +615,7 @@ struct QuicConnectionStateBase { uint64_t udpSendPacketLen{kDefaultUDPSendPacketLen}; struct PacketSchedulingState { - StreamId lastScheduledStream{0}; + StreamId nextScheduledStream{0}; }; PacketSchedulingState schedulingState;