mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-07-30 14:43:05 +03:00
Use new PriorityQueue interface
Summary: Migrating mvfst priority API to be abstract, based on new classes is quic/priority. For now, it requires applications use `HTTPPriorityQueue::Priority`, to be compatible with the hardcoded `deprecated::PriorityQueue` implementation and apps cannot yet change the queue impl. Eventually the application will have full control of the queue. There are minor functional changes in this diff: 1. Priority QLog types changed from int/bool to string 2. Any PAUSED stream has priority `u=7,i` if paused streams are disabled (previously explicitly settable to any priority) Reviewed By: jbeshay Differential Revision: D68696110 fbshipit-source-id: 5a4721b08248ac75d725f51b5cb3e5d5de206d86
This commit is contained in:
committed by
Facebook GitHub Bot
parent
a231536f6c
commit
444a0f261b
@ -88,8 +88,8 @@ mvfst_cpp_library(
|
||||
"//quic/congestion_control:newreno",
|
||||
"//quic/observer:socket_observer_container",
|
||||
"//quic/observer:socket_observer_types",
|
||||
"//quic/priority:priority_queue",
|
||||
"//quic/state:quic_connection_stats",
|
||||
"//quic/state:quic_priority_queue",
|
||||
"//quic/state:quic_state_machine",
|
||||
"//quic/state:quic_stream_utilities",
|
||||
"//quic/state:retransmission_policy",
|
||||
|
@ -436,14 +436,14 @@ StreamId StreamFrameScheduler::writeStreamsHelper(
|
||||
|
||||
void StreamFrameScheduler::writeStreamsHelper(
|
||||
PacketBuilderInterface& builder,
|
||||
PriorityQueue& writableStreams,
|
||||
deprecated::PriorityQueue& writableStreams,
|
||||
uint64_t& connWritableBytes,
|
||||
bool streamPerPacket) {
|
||||
// Fill a packet with non-control stream data, in priority order
|
||||
for (size_t index = 0; index < writableStreams.levels.size() &&
|
||||
builder.remainingSpaceInPkt() > 0;
|
||||
index++) {
|
||||
PriorityQueue::Level& level = writableStreams.levels[index];
|
||||
deprecated::PriorityQueue::Level& level = writableStreams.levels[index];
|
||||
if (level.empty()) {
|
||||
// No data here, keep going
|
||||
continue;
|
||||
|
@ -109,7 +109,7 @@ class StreamFrameScheduler {
|
||||
|
||||
void writeStreamsHelper(
|
||||
PacketBuilderInterface& builder,
|
||||
PriorityQueue& writableStreams,
|
||||
deprecated::PriorityQueue& writableStreams,
|
||||
uint64_t& connWritableBytes,
|
||||
bool streamPerPacket);
|
||||
|
||||
|
@ -18,8 +18,8 @@
|
||||
#include <quic/congestion_control/Bandwidth.h>
|
||||
#include <quic/observer/SocketObserverContainer.h>
|
||||
#include <quic/observer/SocketObserverTypes.h>
|
||||
#include <quic/priority/PriorityQueue.h>
|
||||
#include <quic/state/QuicConnectionStats.h>
|
||||
#include <quic/state/QuicPriorityQueue.h>
|
||||
#include <quic/state/QuicStreamGroupRetransmissionPolicy.h>
|
||||
#include <quic/state/QuicStreamUtilities.h>
|
||||
#include <quic/state/StateData.h>
|
||||
@ -157,8 +157,8 @@ class QuicSocket : virtual public QuicSocketLite {
|
||||
/**
|
||||
* Get stream priority.
|
||||
*/
|
||||
virtual folly::Expected<Priority, LocalErrorCode> getStreamPriority(
|
||||
StreamId id) = 0;
|
||||
virtual folly::Expected<PriorityQueue::Priority, LocalErrorCode>
|
||||
getStreamPriority(StreamId id) = 0;
|
||||
|
||||
/**
|
||||
* Convenience function that sets the read callbacks of all streams to be
|
||||
|
@ -783,24 +783,18 @@ class QuicSocketLite {
|
||||
*/
|
||||
[[nodiscard]] virtual bool isKnobSupported() const = 0;
|
||||
|
||||
/**
|
||||
* Set stream priority.
|
||||
* level: can only be in [0, 7].
|
||||
/*
|
||||
* Set the priority queue implementation.
|
||||
*/
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
setStreamPriority(StreamId id, PriorityLevel level, bool incremental) {
|
||||
return setStreamPriority(id, Priority(level, incremental));
|
||||
}
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode> setPriorityQueue(
|
||||
std::unique_ptr<PriorityQueue> queue) = 0;
|
||||
|
||||
/**
|
||||
* Set stream priority.
|
||||
* level: can only be in [0, 7].
|
||||
* incremental: true/false
|
||||
* orderId: uint64
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
|
||||
StreamId id,
|
||||
Priority priority) = 0;
|
||||
PriorityQueue::Priority pri) = 0;
|
||||
|
||||
/**
|
||||
* Sets the maximum pacing rate in Bytes per second to be used
|
||||
|
@ -681,8 +681,8 @@ QuicTransportBase::readDatagramBufs(size_t atMost) {
|
||||
return retDatagrams;
|
||||
}
|
||||
|
||||
folly::Expected<Priority, LocalErrorCode> QuicTransportBase::getStreamPriority(
|
||||
StreamId id) {
|
||||
folly::Expected<PriorityQueue::Priority, LocalErrorCode>
|
||||
QuicTransportBase::getStreamPriority(StreamId id) {
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ class QuicTransportBase : public QuicSocket,
|
||||
|
||||
// Subclass API.
|
||||
|
||||
folly::Expected<Priority, LocalErrorCode> getStreamPriority(
|
||||
folly::Expected<PriorityQueue::Priority, LocalErrorCode> getStreamPriority(
|
||||
StreamId id) override;
|
||||
|
||||
/**
|
||||
|
@ -811,23 +811,27 @@ const std::shared_ptr<QLogger> QuicTransportBaseLite::getQLogger() const {
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBaseLite::setStreamPriority(StreamId id, Priority priority) {
|
||||
QuicTransportBaseLite::setPriorityQueue(std::unique_ptr<PriorityQueue> queue) {
|
||||
if (conn_) {
|
||||
return conn_->streamManager->setPriorityQueue(std::move(queue));
|
||||
}
|
||||
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBaseLite::setStreamPriority(
|
||||
StreamId id,
|
||||
PriorityQueue::Priority priority) {
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
if (priority.level > kDefaultMaxPriority) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
// It's not an error to try to prioritize a non-existent stream.
|
||||
return folly::unit;
|
||||
}
|
||||
// It's not an error to prioritize a stream after it's sent its FIN - this
|
||||
// can reprioritize retransmissions.
|
||||
bool updated = conn_->streamManager->setStreamPriority(id, priority);
|
||||
if (updated && conn_->qLogger) {
|
||||
conn_->qLogger->addPriorityUpdate(id, priority.level, priority.incremental);
|
||||
}
|
||||
conn_->streamManager->setStreamPriority(id, priority, conn_->qLogger);
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
|
@ -214,9 +214,12 @@ class QuicTransportBaseLite : virtual public QuicSocketLite,
|
||||
*/
|
||||
[[nodiscard]] bool isKnobSupported() const override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> setPriorityQueue(
|
||||
std::unique_ptr<PriorityQueue> queue) override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
|
||||
StreamId id,
|
||||
Priority priority) override;
|
||||
PriorityQueue::Priority priority) override;
|
||||
|
||||
/**
|
||||
* Sets the maximum pacing rate in Bytes per second to be used
|
||||
|
@ -59,6 +59,7 @@ mvfst_cpp_test(
|
||||
"//quic/dsr/test:mocks",
|
||||
"//quic/handshake/test:mocks",
|
||||
"//quic/logging/test:mocks",
|
||||
"//quic/priority:http_priority_queue",
|
||||
"//quic/server/state:server",
|
||||
"//quic/state:stream_functions",
|
||||
"//quic/state/stream:stream",
|
||||
@ -129,6 +130,7 @@ mvfst_cpp_test(
|
||||
"//quic/dsr/test:mocks",
|
||||
"//quic/fizz/client/handshake:fizz_client_handshake",
|
||||
"//quic/fizz/server/handshake:fizz_server_handshake",
|
||||
"//quic/priority:http_priority_queue",
|
||||
"//quic/server/state:server",
|
||||
"//quic/state:stream_functions",
|
||||
"//quic/state/test:mocks",
|
||||
|
@ -130,9 +130,13 @@ class MockQuicSocket : public QuicSocket {
|
||||
MOCK_METHOD(
|
||||
(folly::Expected<folly::Unit, LocalErrorCode>),
|
||||
setStreamPriority,
|
||||
(StreamId, Priority));
|
||||
(StreamId, PriorityQueue::Priority));
|
||||
MOCK_METHOD(
|
||||
(folly::Expected<Priority, LocalErrorCode>),
|
||||
(folly::Expected<folly::Unit, LocalErrorCode>),
|
||||
setPriorityQueue,
|
||||
(std::unique_ptr<PriorityQueue> queue));
|
||||
MOCK_METHOD(
|
||||
(folly::Expected<PriorityQueue::Priority, LocalErrorCode>),
|
||||
getStreamPriority,
|
||||
(StreamId));
|
||||
MOCK_METHOD(
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <quic/dsr/test/Mocks.h>
|
||||
#include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h>
|
||||
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/server/state/ServerStateMachine.h>
|
||||
#include <quic/state/QuicStreamFunctions.h>
|
||||
#include <quic/state/test/MockQuicStats.h>
|
||||
@ -120,7 +121,7 @@ createConn(uint32_t maxStreams, uint64_t maxOffset, uint64_t initialMaxOffset) {
|
||||
|
||||
auto createStream(
|
||||
QuicClientConnectionState& conn,
|
||||
std::optional<Priority> priority = std::nullopt) {
|
||||
std::optional<HTTPPriorityQueue::Priority> priority = std::nullopt) {
|
||||
auto stream = conn.streamManager->createNextBidirectionalStream().value();
|
||||
if (priority) {
|
||||
stream->priority = *priority;
|
||||
@ -232,11 +233,19 @@ void verifyStreamFrames(
|
||||
|
||||
namespace quic::test {
|
||||
|
||||
class QuicPacketSchedulerTest : public testing::Test {
|
||||
class QuicPacketSchedulerTestBase {
|
||||
public:
|
||||
QuicVersion version{QuicVersion::MVFST};
|
||||
};
|
||||
|
||||
class QuicPacketSchedulerTest : public QuicPacketSchedulerTestBase,
|
||||
public testing::Test {
|
||||
public:
|
||||
StreamId nextScheduledStreamID(QuicConnectionStateBase& conn) {
|
||||
return conn.streamManager->writeQueue().getNextScheduledStream();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, CryptoPaddingInitialPacket) {
|
||||
QuicClientConnectionState conn(
|
||||
FizzClientQuicHandshakeContext::Builder().build());
|
||||
@ -1499,9 +1508,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerAllFit) {
|
||||
auto builder = setupMockPacketBuilder();
|
||||
scheduler.writeStreams(*builder);
|
||||
verifyStreamFrames(*builder, {f1, f2, f3});
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
|
||||
0);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), 0);
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobin) {
|
||||
@ -1548,11 +1555,11 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinNextsPer) {
|
||||
setupMockPacketBuilder({1500, 0, 1400, 0, 1300, 1100, 1000, 0});
|
||||
scheduler.writeStreams(*builder2);
|
||||
builder2->advanceRemaining();
|
||||
ASSERT_EQ(conn.streamManager->writeQueue().getNextScheduledStream(), stream1);
|
||||
ASSERT_EQ(nextScheduledStreamID(conn), stream1);
|
||||
ASSERT_EQ(builder2->frames_.size(), 1);
|
||||
scheduler.writeStreams(*builder2);
|
||||
ASSERT_EQ(builder2->frames_.size(), 2);
|
||||
ASSERT_EQ(conn.streamManager->writeQueue().getNextScheduledStream(), stream2);
|
||||
ASSERT_EQ(nextScheduledStreamID(conn), stream2);
|
||||
builder2->advanceRemaining();
|
||||
scheduler.writeStreams(*builder2);
|
||||
scheduler.writeStreams(*builder2);
|
||||
@ -1576,9 +1583,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinStreamPerPacket) {
|
||||
|
||||
auto builder = createPacketBuilder(conn);
|
||||
scheduler.writeStreams(builder);
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
|
||||
stream2);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), stream2);
|
||||
|
||||
// Should write frames for stream2, stream3, followed by stream1 again.
|
||||
auto builder2 = setupMockPacketBuilder();
|
||||
@ -1634,9 +1639,7 @@ TEST_F(
|
||||
auto builder1 = createPacketBuilder(conn);
|
||||
scheduler.writeStreams(builder1);
|
||||
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
|
||||
stream2);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), stream2);
|
||||
|
||||
// Should write frames for stream2, stream3, followed by an empty write.
|
||||
auto builder2 = setupMockPacketBuilder();
|
||||
@ -1657,9 +1660,9 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
|
||||
auto& conn = *connPtr;
|
||||
StreamFrameScheduler scheduler(conn);
|
||||
|
||||
auto stream1 = createStream(conn, Priority(0, false));
|
||||
auto stream2 = createStream(conn, Priority(0, false));
|
||||
auto stream3 = createStream(conn, Priority(0, false));
|
||||
auto stream1 = createStream(conn, HTTPPriorityQueue::Priority(0, false));
|
||||
auto stream2 = createStream(conn, HTTPPriorityQueue::Priority(0, false));
|
||||
auto stream3 = createStream(conn, HTTPPriorityQueue::Priority(0, false));
|
||||
|
||||
auto largeBuf = createLargeBuffer(conn.udpSendPacketLen * 2);
|
||||
auto f1 = writeDataToStream(conn, stream1, std::move(largeBuf));
|
||||
@ -1670,10 +1673,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
|
||||
auto builder1 = createPacketBuilder(conn);
|
||||
scheduler.writeStreams(builder1);
|
||||
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(
|
||||
Priority(0, false)),
|
||||
stream1);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), stream1);
|
||||
|
||||
// Should write frames for stream1, stream2, stream3, in that order.
|
||||
auto builder2 = setupMockPacketBuilder();
|
||||
@ -1685,7 +1685,8 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequential) {
|
||||
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) {
|
||||
auto connPtr = createConn(10, 100000, 100000);
|
||||
auto& conn = *connPtr;
|
||||
conn.transportSettings.defaultPriority = Priority(0, false);
|
||||
conn.transportSettings.defaultPriority =
|
||||
HTTPPriorityQueue::Priority(0, false);
|
||||
StreamFrameScheduler scheduler(conn);
|
||||
|
||||
auto stream1 = createStream(conn);
|
||||
@ -1700,10 +1701,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerSequentialDefault) {
|
||||
auto builder1 = createPacketBuilder(conn);
|
||||
scheduler.writeStreams(builder1);
|
||||
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(
|
||||
Priority(0, false)),
|
||||
stream1);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), stream1);
|
||||
|
||||
// Should write frames for stream1, stream2, stream3, in that order.
|
||||
auto builder2 = setupMockPacketBuilder();
|
||||
@ -1736,9 +1734,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
|
||||
auto builder1 = createPacketBuilder(conn);
|
||||
scheduler.writeStreams(builder1);
|
||||
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
|
||||
stream3);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), stream3);
|
||||
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2);
|
||||
|
||||
// Should write frames for stream2, stream4, followed by stream 3 then 1.
|
||||
@ -1747,9 +1743,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRoundRobinControl) {
|
||||
|
||||
verifyStreamFrames(*builder2, {f2, f4, f3, f1});
|
||||
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
|
||||
stream3);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), stream3);
|
||||
EXPECT_EQ(conn.schedulingState.nextScheduledControlStream, stream2);
|
||||
}
|
||||
|
||||
@ -1764,9 +1758,7 @@ TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerOneStream) {
|
||||
auto builder1 = createPacketBuilder(conn);
|
||||
scheduler.writeStreams(builder1);
|
||||
|
||||
EXPECT_EQ(
|
||||
conn.streamManager->writeQueue().getNextScheduledStream(kDefaultPriority),
|
||||
0);
|
||||
EXPECT_EQ(nextScheduledStreamID(conn), 0);
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, StreamFrameSchedulerRemoveOne) {
|
||||
@ -1884,8 +1876,10 @@ TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) {
|
||||
auto& conn = *connPtr;
|
||||
StreamFrameScheduler scheduler(conn);
|
||||
|
||||
auto lowPriStreamId = createStream(conn, Priority(5, false));
|
||||
auto highPriStreamId = createStream(conn, Priority(0, false));
|
||||
auto lowPriStreamId =
|
||||
createStream(conn, HTTPPriorityQueue::Priority(5, false));
|
||||
auto highPriStreamId =
|
||||
createStream(conn, HTTPPriorityQueue::Priority(0, false));
|
||||
|
||||
writeDataToStream(conn, lowPriStreamId, "Onegin");
|
||||
writeDataToStream(
|
||||
@ -2030,7 +2024,8 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
|
||||
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000;
|
||||
|
||||
auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id;
|
||||
conn.streamManager->setStreamPriority(streamId, Priority(0, false));
|
||||
conn.streamManager->setStreamPriority(
|
||||
streamId, HTTPPriorityQueue::Priority(0, false));
|
||||
auto stream = conn.streamManager->findStream(streamId);
|
||||
auto data = buildRandomInputData(1000);
|
||||
ASSERT_FALSE(
|
||||
@ -2723,8 +2718,9 @@ TEST_F(QuicPacketSchedulerTest, RstStreamSchedulerReliableReset) {
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, PausedPriorityEnabled) {
|
||||
static const auto kSequentialPriority = Priority(3, false);
|
||||
static const auto kPausedPriority = Priority(0, false, 0, true /* paused */);
|
||||
static const auto kSequentialPriority = HTTPPriorityQueue::Priority(3, false);
|
||||
static const HTTPPriorityQueue::Priority kPausedPriority =
|
||||
HTTPPriorityQueue::Priority::PAUSED;
|
||||
|
||||
auto connPtr = createConn(10, 100000, 100000);
|
||||
auto& conn = *connPtr;
|
||||
@ -2757,8 +2753,9 @@ TEST_F(QuicPacketSchedulerTest, PausedPriorityEnabled) {
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, PausedPriorityDisabled) {
|
||||
static const auto kSequentialPriority = Priority(3, false);
|
||||
static const auto kPausedPriority = Priority(0, false, 0, true /* paused */);
|
||||
static const auto kSequentialPriority = HTTPPriorityQueue::Priority(3, false);
|
||||
static const HTTPPriorityQueue::Priority kPausedPriority =
|
||||
HTTPPriorityQueue::Priority::PAUSED;
|
||||
|
||||
auto connPtr = createConn(10, 100000, 100000);
|
||||
auto& conn = *connPtr;
|
||||
@ -2773,7 +2770,7 @@ TEST_F(QuicPacketSchedulerTest, PausedPriorityDisabled) {
|
||||
|
||||
auto builder = setupMockPacketBuilder();
|
||||
scheduler.writeStreams(*builder);
|
||||
verifyStreamFrames(*builder, {pausedFrame, regularFrame});
|
||||
verifyStreamFrames(*builder, {regularFrame, pausedFrame});
|
||||
}
|
||||
|
||||
TEST_F(QuicPacketSchedulerTest, FixedShortHeaderPadding) {
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include <quic/congestion_control/StaticCwndCongestionController.h>
|
||||
#include <quic/handshake/test/Mocks.h>
|
||||
#include <quic/logging/test/Mocks.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/server/state/ServerStateMachine.h>
|
||||
#include <quic/state/QuicStreamFunctions.h>
|
||||
#include <quic/state/stream/StreamReceiveHandlers.h>
|
||||
@ -1648,7 +1649,7 @@ TEST_F(QuicTransportTest, WriteSmall) {
|
||||
EXPECT_CALL(*socket_, write(_, _, _))
|
||||
.WillOnce(testing::WithArgs<1, 2>(Invoke(getTotalIovecLen)));
|
||||
transport_->writeChain(stream, buf->clone(), false);
|
||||
transport_->setStreamPriority(stream, Priority(0, false));
|
||||
transport_->setStreamPriority(stream, HTTPPriorityQueue::Priority(0, false));
|
||||
loopForWrites();
|
||||
auto& conn = transport_->getConnectionState();
|
||||
verifyCorrectness(conn, 0, stream, *buf);
|
||||
@ -3101,7 +3102,8 @@ TEST_F(QuicTransportTest, NonWritableStreamAPI) {
|
||||
// Check that write-side APIs return an error
|
||||
auto res2 = transport_->notifyPendingWriteOnStream(streamId, &writeCallback_);
|
||||
EXPECT_EQ(LocalErrorCode::STREAM_CLOSED, res2.error());
|
||||
auto res3 = transport_->setStreamPriority(streamId, Priority(0, false));
|
||||
auto res3 = transport_->setStreamPriority(
|
||||
streamId, HTTPPriorityQueue::Priority(0, false));
|
||||
EXPECT_FALSE(res3.hasError());
|
||||
}
|
||||
|
||||
@ -5103,9 +5105,17 @@ TEST_F(QuicTransportTest, GetStreamPacketsTxedMultiplePackets) {
|
||||
|
||||
TEST_F(QuicTransportTest, PrioritySetAndGet) {
|
||||
auto stream = transport_->createBidirectionalStream().value();
|
||||
EXPECT_EQ(kDefaultPriority, transport_->getStreamPriority(stream).value());
|
||||
transport_->setStreamPriority(stream, Priority(0, false));
|
||||
EXPECT_EQ(Priority(0, false), transport_->getStreamPriority(stream).value());
|
||||
PriorityQueue::Priority basePri;
|
||||
HTTPPriorityQueue::Priority defaultPri(basePri);
|
||||
EXPECT_EQ(
|
||||
defaultPri,
|
||||
HTTPPriorityQueue::Priority(
|
||||
transport_->getStreamPriority(stream).value()));
|
||||
transport_->setStreamPriority(stream, HTTPPriorityQueue::Priority(0, false));
|
||||
EXPECT_EQ(
|
||||
HTTPPriorityQueue::Priority(0, false),
|
||||
HTTPPriorityQueue::Priority(
|
||||
transport_->getStreamPriority(stream).value()));
|
||||
auto nonExistStreamPri = transport_->getStreamPriority(stream + 4);
|
||||
EXPECT_TRUE(nonExistStreamPri.hasError());
|
||||
EXPECT_EQ(LocalErrorCode::STREAM_NOT_EXISTS, nonExistStreamPri.error());
|
||||
|
@ -30,8 +30,8 @@ DSRStreamFrameScheduler::enrichAndAddSendInstruction(
|
||||
DSRStreamFrameScheduler::SchedulingResult result,
|
||||
DSRPacketBuilderBase& packetBuilder,
|
||||
SendInstruction::Builder& instructionBuilder,
|
||||
const PriorityQueue& writeQueue,
|
||||
const PriorityQueue::LevelItr& levelIter,
|
||||
const deprecated::PriorityQueue& writeQueue,
|
||||
const deprecated::PriorityQueue::LevelItr& levelIter,
|
||||
QuicStreamState& stream) {
|
||||
enrichInstruction(instructionBuilder, stream);
|
||||
packetBuilder.addSendInstruction(
|
||||
|
@ -42,8 +42,8 @@ class DSRStreamFrameScheduler {
|
||||
SchedulingResult,
|
||||
DSRPacketBuilderBase&,
|
||||
SendInstruction::Builder&,
|
||||
const PriorityQueue&,
|
||||
const PriorityQueue::LevelItr&,
|
||||
const deprecated::PriorityQueue&,
|
||||
const deprecated::PriorityQueue::LevelItr&,
|
||||
QuicStreamState&);
|
||||
|
||||
private:
|
||||
|
@ -73,6 +73,7 @@ mvfst_cpp_test(
|
||||
"//folly/portability:gtest",
|
||||
"//quic/dsr/frontend:write_functions",
|
||||
"//quic/dsr/test:test_common",
|
||||
"//quic/priority:http_priority_queue",
|
||||
"//quic/state/test:mocks",
|
||||
],
|
||||
)
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <folly/portability/GTest.h>
|
||||
#include <quic/dsr/frontend/WriteFunctions.h>
|
||||
#include <quic/dsr/test/TestCommon.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/state/test/Mocks.h>
|
||||
|
||||
using namespace testing;
|
||||
@ -263,8 +264,10 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) {
|
||||
auto streamId2 = prepareOneStream(1000);
|
||||
auto stream1 = conn_.streamManager->findStream(streamId1);
|
||||
auto stream2 = conn_.streamManager->findStream(streamId2);
|
||||
conn_.streamManager->setStreamPriority(streamId1, Priority{3, false});
|
||||
conn_.streamManager->setStreamPriority(streamId2, Priority{3, false});
|
||||
conn_.streamManager->setStreamPriority(
|
||||
streamId1, HTTPPriorityQueue::Priority{3, false});
|
||||
conn_.streamManager->setStreamPriority(
|
||||
streamId2, HTTPPriorityQueue::Priority{3, false});
|
||||
// Pretend we sent the non DSR data on first stream
|
||||
stream1->ackedIntervals.insert(0, stream1->writeBuffer.chainLength() - 1);
|
||||
stream1->currentWriteOffset = stream1->writeBuffer.chainLength();
|
||||
@ -289,8 +292,10 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsIncremental) {
|
||||
auto streamId2 = prepareOneStream(1000);
|
||||
auto stream1 = conn_.streamManager->findStream(streamId1);
|
||||
auto stream2 = conn_.streamManager->findStream(streamId2);
|
||||
conn_.streamManager->setStreamPriority(streamId1, Priority{3, true});
|
||||
conn_.streamManager->setStreamPriority(streamId2, Priority{3, true});
|
||||
conn_.streamManager->setStreamPriority(
|
||||
streamId1, HTTPPriorityQueue::Priority{3, true});
|
||||
conn_.streamManager->setStreamPriority(
|
||||
streamId2, HTTPPriorityQueue::Priority{3, true});
|
||||
// Pretend we sent the non DSR data on second stream
|
||||
stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1);
|
||||
stream2->currentWriteOffset = stream2->writeBuffer.chainLength();
|
||||
|
@ -28,6 +28,7 @@ mvfst_cpp_library(
|
||||
exported_deps = [
|
||||
":qlogger_constants",
|
||||
"//quic/codec:types",
|
||||
"//quic/priority:priority_queue",
|
||||
],
|
||||
)
|
||||
|
||||
@ -49,6 +50,7 @@ mvfst_cpp_library(
|
||||
":qlogger_constants",
|
||||
"//folly:dynamic",
|
||||
"//quic/codec:types",
|
||||
"//quic/priority:priority_queue",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -503,12 +503,11 @@ void FileQLogger::addPathValidationEvent(bool success) {
|
||||
|
||||
void FileQLogger::addPriorityUpdate(
|
||||
quic::StreamId streamId,
|
||||
uint8_t urgency,
|
||||
bool incremental) {
|
||||
PriorityQueue::PriorityLogFields priority) {
|
||||
auto refTime = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::steady_clock::now().time_since_epoch());
|
||||
handleEvent(std::make_unique<quic::QLogPriorityUpdateEvent>(
|
||||
streamId, urgency, incremental, refTime));
|
||||
streamId, std::move(priority), refTime));
|
||||
}
|
||||
|
||||
void FileQLogger::addL4sWeightUpdate(
|
||||
|
@ -111,8 +111,7 @@ class FileQLogger : public BaseQLogger {
|
||||
virtual void addPathValidationEvent(bool success) override;
|
||||
void addPriorityUpdate(
|
||||
quic::StreamId streamId,
|
||||
uint8_t urgency,
|
||||
bool incremental) override;
|
||||
PriorityQueue::PriorityLogFields priority) override;
|
||||
void addL4sWeightUpdate(double l4sWeight, uint32_t newEct1, uint32_t newCe)
|
||||
override;
|
||||
void addNetworkPathModelUpdate(
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <quic/codec/QuicConnectionId.h>
|
||||
#include <quic/codec/Types.h>
|
||||
#include <quic/logging/QLoggerConstants.h>
|
||||
#include <quic/priority/PriorityQueue.h>
|
||||
|
||||
namespace quic {
|
||||
|
||||
@ -120,8 +121,7 @@ class QLogger {
|
||||
virtual void addPathValidationEvent(bool success) = 0;
|
||||
virtual void addPriorityUpdate(
|
||||
quic::StreamId streamId,
|
||||
uint8_t urgency,
|
||||
bool incremental) = 0;
|
||||
PriorityQueue::PriorityLogFields priority) = 0;
|
||||
virtual void
|
||||
addL4sWeightUpdate(double l4sWeight, uint32_t newEct1, uint32_t newCe) = 0;
|
||||
virtual void addNetworkPathModelUpdate(
|
||||
|
@ -261,12 +261,11 @@ void QLoggerCommon::addPathValidationEvent(bool success) {
|
||||
|
||||
void QLoggerCommon::addPriorityUpdate(
|
||||
quic::StreamId streamId,
|
||||
uint8_t urgency,
|
||||
bool incremental) {
|
||||
PriorityQueue::PriorityLogFields priority) {
|
||||
auto refTime = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::steady_clock::now().time_since_epoch());
|
||||
logTrace(std::make_unique<quic::QLogPriorityUpdateEvent>(
|
||||
streamId, urgency, incremental, refTime));
|
||||
streamId, std::move(priority), refTime));
|
||||
}
|
||||
|
||||
void QLoggerCommon::addL4sWeightUpdate(
|
||||
|
@ -85,8 +85,7 @@ class QLoggerCommon : public quic::BaseQLogger {
|
||||
void addPathValidationEvent(bool success) override;
|
||||
void addPriorityUpdate(
|
||||
quic::StreamId streamId,
|
||||
uint8_t urgency,
|
||||
bool incremental) override;
|
||||
PriorityQueue::PriorityLogFields priority) override;
|
||||
void addL4sWeightUpdate(double l4sWeight, uint32_t newEct1, uint32_t newCe)
|
||||
override;
|
||||
void addNetworkPathModelUpdate(
|
||||
|
@ -970,10 +970,9 @@ folly::dynamic QLogPathValidationEvent::toDynamic() const {
|
||||
|
||||
QLogPriorityUpdateEvent::QLogPriorityUpdateEvent(
|
||||
StreamId streamId,
|
||||
uint8_t urgency,
|
||||
bool incremental,
|
||||
PriorityQueue::PriorityLogFields priority,
|
||||
std::chrono::microseconds refTimeIn)
|
||||
: streamId_(streamId), urgency_(urgency), incremental_(incremental) {
|
||||
: streamId_(streamId), priority_(std::move(priority)) {
|
||||
eventType = QLogEventType::PriorityUpdate;
|
||||
refTime = refTimeIn;
|
||||
}
|
||||
@ -984,8 +983,9 @@ folly::dynamic QLogPriorityUpdateEvent::toDynamic() const {
|
||||
folly::dynamic data = folly::dynamic::object();
|
||||
|
||||
data["id"] = streamId_;
|
||||
data["urgency"] = urgency_;
|
||||
data["incremental"] = incremental_;
|
||||
for (const auto& entry : priority_) {
|
||||
data[entry.first] = entry.second;
|
||||
}
|
||||
d.push_back(std::move(data));
|
||||
return d;
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <folly/json/dynamic.h> // @manual=//folly:dynamic
|
||||
#include <quic/codec/Types.h>
|
||||
#include <quic/logging/QLoggerConstants.h>
|
||||
#include <quic/priority/PriorityQueue.h>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
@ -786,8 +787,7 @@ class QLogPriorityUpdateEvent : public QLogEvent {
|
||||
public:
|
||||
explicit QLogPriorityUpdateEvent(
|
||||
StreamId id,
|
||||
uint8_t urgency,
|
||||
bool incremental,
|
||||
PriorityQueue::PriorityLogFields priority,
|
||||
std::chrono::microseconds refTimeIn);
|
||||
~QLogPriorityUpdateEvent() override = default;
|
||||
|
||||
@ -795,8 +795,7 @@ class QLogPriorityUpdateEvent : public QLogEvent {
|
||||
|
||||
private:
|
||||
StreamId streamId_;
|
||||
uint8_t urgency_;
|
||||
bool incremental_;
|
||||
PriorityQueue::PriorityLogFields priority_;
|
||||
};
|
||||
|
||||
class QLogL4sWeightUpdateEvent : public QLogEvent {
|
||||
|
@ -67,7 +67,10 @@ class MockQLogger : public QLogger {
|
||||
MOCK_METHOD(void, addPathValidationEvent, (bool));
|
||||
MOCK_METHOD(void, setDcid, (Optional<ConnectionId>));
|
||||
MOCK_METHOD(void, setScid, (Optional<ConnectionId>));
|
||||
MOCK_METHOD(void, addPriorityUpdate, (quic::StreamId, uint8_t, bool));
|
||||
MOCK_METHOD(
|
||||
void,
|
||||
addPriorityUpdate,
|
||||
(quic::StreamId, PriorityQueue::PriorityLogFields));
|
||||
MOCK_METHOD(
|
||||
void,
|
||||
addL4sWeightUpdate,
|
||||
|
@ -62,6 +62,7 @@ target_compile_options(
|
||||
target_link_libraries(
|
||||
mvfst_http_priority_queue PUBLIC
|
||||
Folly::folly
|
||||
mvfst_round_robin
|
||||
)
|
||||
|
||||
file(
|
||||
|
@ -75,6 +75,7 @@ mvfst_cpp_library(
|
||||
"//quic/dsr/frontend:write_functions",
|
||||
"//quic/fizz/handshake:fizz_handshake",
|
||||
"//quic/fizz/server/handshake:fizz_server_handshake",
|
||||
"//quic/priority:http_priority_queue",
|
||||
"//quic/server/handshake:app_token",
|
||||
"//quic/server/handshake:default_app_token_validator",
|
||||
"//quic/server/handshake:stateless_reset_generator",
|
||||
@ -83,7 +84,9 @@ mvfst_cpp_library(
|
||||
"//quic/state:quic_stream_utilities",
|
||||
"//quic/state:transport_settings_functions",
|
||||
] + select({
|
||||
"DEFAULT": ["//folly/io/async:io_uring_backend"],
|
||||
"DEFAULT": [
|
||||
"//folly/io/async:io_uring_backend",
|
||||
],
|
||||
"ovr_config//os:windows": [],
|
||||
}),
|
||||
exported_deps = [
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <quic/congestion_control/ServerCongestionControllerFactory.h>
|
||||
#include <quic/dsr/frontend/WriteFunctions.h>
|
||||
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/server/QuicServerTransport.h>
|
||||
#include <quic/server/handshake/AppToken.h>
|
||||
#include <quic/server/handshake/DefaultAppTokenValidator.h>
|
||||
@ -1067,7 +1068,7 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() {
|
||||
} catch (std::exception&) {
|
||||
parseSuccess = false;
|
||||
}
|
||||
if (!parseSuccess) {
|
||||
if (!parseSuccess || level > 7) {
|
||||
auto errMsg = fmt::format(
|
||||
"Received invalid KnobParam for DEFAULT_STREAM_PRIORITY: {}",
|
||||
val);
|
||||
@ -1075,7 +1076,7 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() {
|
||||
throw std::runtime_error(errMsg);
|
||||
}
|
||||
serverConn->transportSettings.defaultPriority =
|
||||
Priority(level, incremental);
|
||||
HTTPPriorityQueue::Priority(level, incremental);
|
||||
VLOG(3) << "DEFAULT_STREAM_PRIORITY KnobParam received: " << val;
|
||||
});
|
||||
registerTransportKnobParamHandler(
|
||||
@ -1094,7 +1095,8 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() {
|
||||
auto val = std::get<uint64_t>(value);
|
||||
auto serverConn = serverTransport->serverConn_;
|
||||
serverConn->transportSettings.priorityQueueWritesPerStream = val;
|
||||
serverConn->streamManager->writeQueue().setMaxNextsPerStream(val);
|
||||
serverConn->streamManager->setWriteQueueMaxNextsPerStream(
|
||||
serverConn->transportSettings.priorityQueueWritesPerStream);
|
||||
VLOG(3) << "WRITES_PER_STREAM KnobParam received: " << val;
|
||||
});
|
||||
registerTransportKnobParamHandler(
|
||||
@ -1328,7 +1330,7 @@ QuicSocket::WriteResult QuicServerTransport::setDSRPacketizationRequestSender(
|
||||
// shown good results.
|
||||
if (conn_->transportSettings.priorityQueueWritesPerStream == 1) {
|
||||
conn_->transportSettings.priorityQueueWritesPerStream = 5;
|
||||
conn_->streamManager->writeQueue().setMaxNextsPerStream(5);
|
||||
conn_->streamManager->setWriteQueueMaxNextsPerStream(5);
|
||||
}
|
||||
|
||||
// Fow now, no appLimited or appIdle update here since we are not writing
|
||||
|
@ -92,6 +92,7 @@ fb_dirsync_cpp_unittest(
|
||||
"//quic/fizz/handshake:fizz_handshake",
|
||||
"//quic/fizz/server/handshake:fizz_server_handshake",
|
||||
"//quic/logging:file_qlogger",
|
||||
"//quic/priority:http_priority_queue",
|
||||
"//quic/server/handshake:server_handshake",
|
||||
"//quic/state:stream_functions",
|
||||
"//quic/state/test:mocks",
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <quic/dsr/test/Mocks.h>
|
||||
#include <quic/fizz/handshake/FizzCryptoFactory.h>
|
||||
#include <quic/logging/FileQLogger.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/server/handshake/ServerHandshake.h>
|
||||
#include <quic/state/QuicStreamFunctions.h>
|
||||
#include <quic/state/test/Mocks.h>
|
||||
@ -5364,15 +5365,32 @@ TEST_F(QuicServerTransportTest, TestAckFrequencyPolicyKnobHandler) {
|
||||
server->handleKnobParams(
|
||||
{{static_cast<uint64_t>(TransportKnobParamId::DEFAULT_STREAM_PRIORITY),
|
||||
"1,1"}});
|
||||
EXPECT_EQ(server->getTransportSettings().defaultPriority, Priority(1, true));
|
||||
EXPECT_EQ(
|
||||
HTTPPriorityQueue::Priority(
|
||||
server->getTransportSettings().defaultPriority),
|
||||
HTTPPriorityQueue::Priority(1, true));
|
||||
server->handleKnobParams(
|
||||
{{static_cast<uint64_t>(TransportKnobParamId::DEFAULT_STREAM_PRIORITY),
|
||||
"4,0"}});
|
||||
EXPECT_EQ(server->getTransportSettings().defaultPriority, Priority(4, false));
|
||||
EXPECT_EQ(
|
||||
HTTPPriorityQueue::Priority(
|
||||
server->getTransportSettings().defaultPriority),
|
||||
HTTPPriorityQueue::Priority(4, false));
|
||||
server->handleKnobParams(
|
||||
{{static_cast<uint64_t>(TransportKnobParamId::DEFAULT_STREAM_PRIORITY),
|
||||
"4,0,10"}});
|
||||
EXPECT_EQ(server->getTransportSettings().defaultPriority, Priority(4, false));
|
||||
EXPECT_EQ(
|
||||
HTTPPriorityQueue::Priority(
|
||||
server->getTransportSettings().defaultPriority),
|
||||
HTTPPriorityQueue::Priority(4, false));
|
||||
// level too large, unchanged
|
||||
server->handleKnobParams(
|
||||
{{static_cast<uint64_t>(TransportKnobParamId::DEFAULT_STREAM_PRIORITY),
|
||||
"20,0"}});
|
||||
EXPECT_EQ(
|
||||
HTTPPriorityQueue::Priority(
|
||||
server->getTransportSettings().defaultPriority),
|
||||
HTTPPriorityQueue::Priority(4, false));
|
||||
server->handleKnobParams(
|
||||
{{static_cast<uint64_t>(TransportKnobParamId::WRITE_LOOP_TIME_FRACTION),
|
||||
uint64_t(2)}});
|
||||
|
@ -32,9 +32,9 @@ mvfst_cpp_library(
|
||||
"TransportSettings.h",
|
||||
],
|
||||
exported_deps = [
|
||||
":quic_priority_queue",
|
||||
"//quic:constants",
|
||||
"//quic/common:optional",
|
||||
"//quic/priority:priority_queue",
|
||||
],
|
||||
)
|
||||
|
||||
@ -108,6 +108,9 @@ mvfst_cpp_library(
|
||||
"StateData.h",
|
||||
"StreamData.h",
|
||||
],
|
||||
deps = [
|
||||
"//quic/priority:http_priority_queue",
|
||||
],
|
||||
exported_deps = [
|
||||
":ack_event",
|
||||
":ack_states",
|
||||
@ -136,6 +139,7 @@ mvfst_cpp_library(
|
||||
"//quic/handshake:handshake",
|
||||
"//quic/logging:qlogger",
|
||||
"//quic/observer:socket_observer_types",
|
||||
"//quic/priority:priority_queue",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -49,6 +49,7 @@ target_link_libraries(
|
||||
mvfst_codec_types
|
||||
mvfst_dsr_sender
|
||||
mvfst_handshake
|
||||
mvfst_http_priority_queue
|
||||
)
|
||||
|
||||
add_library(
|
||||
|
@ -7,7 +7,7 @@
|
||||
|
||||
#include <quic/state/QuicPriorityQueue.h>
|
||||
|
||||
namespace quic {
|
||||
namespace quic::deprecated {
|
||||
|
||||
/**
|
||||
* Default priority, urgency = 3, incremental = true
|
||||
@ -15,4 +15,4 @@ namespace quic {
|
||||
*/
|
||||
const Priority kDefaultPriority(3, true);
|
||||
|
||||
} // namespace quic
|
||||
} // namespace quic::deprecated
|
||||
|
@ -13,7 +13,7 @@
|
||||
|
||||
#include <quic/codec/Types.h>
|
||||
|
||||
namespace quic {
|
||||
namespace quic::deprecated {
|
||||
|
||||
constexpr uint8_t kDefaultPriorityLevels = kDefaultMaxPriority + 1;
|
||||
constexpr uint8_t kDefaultPriorityLevelsSize = 2 * kDefaultPriorityLevels;
|
||||
@ -347,4 +347,4 @@ struct PriorityQueue {
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace quic
|
||||
} // namespace quic::deprecated
|
||||
|
@ -5,6 +5,8 @@
|
||||
* LICENSE file in the root directory of this source tree.
|
||||
*/
|
||||
|
||||
#include <quic/logging/QLogger.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/state/QuicStreamManager.h>
|
||||
#include <quic/state/QuicStreamUtilities.h>
|
||||
#include <quic/state/QuicTransportStatsCallback.h>
|
||||
@ -103,6 +105,11 @@ static LocalErrorCode openLocalStreamIfNotClosed(
|
||||
return LocalErrorCode::NO_ERROR;
|
||||
}
|
||||
|
||||
void QuicStreamManager::setWriteQueueMaxNextsPerStream(
|
||||
uint64_t maxNextsPerStream) {
|
||||
writeQueue_.setMaxNextsPerStream(maxNextsPerStream);
|
||||
}
|
||||
|
||||
bool QuicStreamManager::streamExists(StreamId streamId) {
|
||||
if (isLocalStream(nodeType_, streamId)) {
|
||||
if (isUnidirectionalStream(streamId)) {
|
||||
@ -222,15 +229,22 @@ bool QuicStreamManager::consumeMaxLocalUnidirectionalStreamIdIncreased() {
|
||||
return res;
|
||||
}
|
||||
|
||||
bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) {
|
||||
bool QuicStreamManager::setStreamPriority(
|
||||
StreamId id,
|
||||
const PriorityQueue::Priority& newPriority,
|
||||
const std::shared_ptr<QLogger>& qLogger) {
|
||||
auto stream = findStream(id);
|
||||
if (stream) {
|
||||
if (stream->priority == newPriority) {
|
||||
static const HTTPPriorityQueue kPriorityQueue;
|
||||
if (kPriorityQueue.equalPriority(stream->priority, newPriority)) {
|
||||
return false;
|
||||
}
|
||||
stream->priority = newPriority;
|
||||
updateWritableStreams(*stream);
|
||||
writeQueue_.updateIfExist(id, stream->priority);
|
||||
if (qLogger) {
|
||||
qLogger->addPriorityUpdate(
|
||||
id, kPriorityQueue.toLogFields(stream->priority));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@ -834,7 +848,9 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
|
||||
}
|
||||
|
||||
// Check if paused
|
||||
if (stream.priority.paused && !transportSettings_->disablePausedPriority) {
|
||||
// pausedButDisabled adds a hard dep on writeQueue being an HTTPPriorityQueue.
|
||||
auto httpPri = HTTPPriorityQueue::Priority(stream.priority);
|
||||
if (httpPri->paused && !transportSettings_->disablePausedPriority) {
|
||||
removeWritable(stream);
|
||||
return;
|
||||
}
|
||||
@ -866,7 +882,12 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
|
||||
if (stream.isControl) {
|
||||
controlWriteQueue_.emplace(stream.id);
|
||||
} else {
|
||||
writeQueue_.insertOrUpdate(stream.id, stream.priority);
|
||||
const static deprecated::Priority kPausedDisabledPriority(7, true);
|
||||
auto oldPri = httpPri->paused
|
||||
? kPausedDisabledPriority
|
||||
: deprecated::Priority(
|
||||
httpPri->urgency, httpPri->incremental, httpPri->order);
|
||||
writeQueue_.insertOrUpdate(stream.id, oldPri);
|
||||
}
|
||||
} else {
|
||||
// Not schedulable, remove from queues
|
||||
|
@ -11,11 +11,15 @@
|
||||
#include <folly/container/F14Set.h>
|
||||
#include <quic/QuicConstants.h>
|
||||
#include <quic/codec/Types.h>
|
||||
#include <quic/priority/PriorityQueue.h>
|
||||
#include <quic/state/QuicPriorityQueue.h>
|
||||
#include <quic/state/StreamData.h>
|
||||
#include <quic/state/TransportSettings.h>
|
||||
#include <numeric>
|
||||
|
||||
namespace quic {
|
||||
class QLogger;
|
||||
|
||||
namespace detail {
|
||||
|
||||
constexpr uint8_t kStreamIncrement = 0x04;
|
||||
@ -140,9 +144,6 @@ class QuicStreamManager {
|
||||
// Consider throwing here if construction must fail, or setting an error
|
||||
// state. For now, logging is consistent with previous changes.
|
||||
}
|
||||
|
||||
writeQueue_.setMaxNextsPerStream(
|
||||
transportSettings.priorityQueueWritesPerStream);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -238,6 +239,15 @@ class QuicStreamManager {
|
||||
conn_, // Use the new conn ref
|
||||
std::move(pair.second)));
|
||||
}
|
||||
// Call refreshTransportSettings which now returns Expected
|
||||
auto refreshResult = refreshTransportSettings(transportSettings);
|
||||
if (refreshResult.hasError()) {
|
||||
// Constructor cannot return error easily. Log or handle internally.
|
||||
LOG(ERROR) << "Failed initial transport settings refresh: "
|
||||
<< refreshResult.error().message;
|
||||
// Consider throwing here if construction must fail, or setting an error
|
||||
// state. For now, logging is consistent with previous changes.
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -431,10 +441,19 @@ class QuicStreamManager {
|
||||
lossStreams_.insert(id);
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> setPriorityQueue(
|
||||
std::unique_ptr<PriorityQueue>) {
|
||||
LOG(ERROR) << "setPriorityQueue is not supported yet";
|
||||
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update stream priority if the stream indicated by id exists.
|
||||
*/
|
||||
bool setStreamPriority(StreamId id, Priority priority);
|
||||
bool setStreamPriority(
|
||||
StreamId id,
|
||||
const PriorityQueue::Priority& priority,
|
||||
const std::shared_ptr<QLogger>& qLogger = nullptr);
|
||||
|
||||
auto& writableDSRStreams() {
|
||||
return writableDSRStreams_;
|
||||
@ -778,6 +797,8 @@ class QuicStreamManager {
|
||||
peerBidirectionalStreamGroupsSeen_.size();
|
||||
}
|
||||
|
||||
void setWriteQueueMaxNextsPerStream(uint64_t maxNextsPerStream);
|
||||
|
||||
private:
|
||||
void updateAppIdleState();
|
||||
|
||||
@ -857,7 +878,7 @@ class QuicStreamManager {
|
||||
folly::F14FastSet<StreamId> unidirectionalReadableStreams_;
|
||||
folly::F14FastSet<StreamId> peekableStreams_;
|
||||
|
||||
PriorityQueue writeQueue_;
|
||||
deprecated::PriorityQueue writeQueue_;
|
||||
std::set<StreamId> controlWriteQueue_;
|
||||
folly::F14FastSet<StreamId> writableStreams_;
|
||||
folly::F14FastSet<StreamId> writableDSRStreams_;
|
||||
|
@ -13,7 +13,7 @@
|
||||
#include <quic/codec/Types.h>
|
||||
#include <quic/common/SmallCollections.h>
|
||||
#include <quic/dsr/DSRPacketizationRequestSender.h>
|
||||
#include <quic/state/QuicPriorityQueue.h>
|
||||
#include <quic/priority/PriorityQueue.h>
|
||||
|
||||
namespace quic {
|
||||
|
||||
@ -493,7 +493,8 @@ struct QuicStreamState : public QuicStreamLike {
|
||||
// lastHolbTime indicates whether the stream is HOL blocked at the moment.
|
||||
uint32_t holbCount{0};
|
||||
|
||||
Priority priority{kDefaultPriority};
|
||||
// Uninitialized = default
|
||||
PriorityQueue::Priority priority;
|
||||
|
||||
// This monotonically increases by 1 this stream is written to packets. Note
|
||||
// that this is only used for DSR and facilitates loss detection.
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
#include <quic/QuicConstants.h>
|
||||
#include <quic/common/Optional.h>
|
||||
#include <quic/state/QuicPriorityQueue.h>
|
||||
#include <quic/priority/PriorityQueue.h>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
|
||||
@ -367,7 +367,7 @@ struct TransportSettings {
|
||||
bool useCwndHintsInSessionTicket{false};
|
||||
|
||||
// The default priority to instantiate streams with.
|
||||
Priority defaultPriority{kDefaultPriority};
|
||||
PriorityQueue::Priority defaultPriority;
|
||||
|
||||
// How many times we will a schedule a stream to packets before moving onto
|
||||
// the next one in the queue. Only relevant for incremental priority.
|
||||
|
@ -137,6 +137,7 @@ mvfst_cpp_test(
|
||||
"fbsource//third-party/googletest:gmock",
|
||||
":mocks",
|
||||
"//quic/fizz/server/handshake:fizz_server_handshake",
|
||||
"//quic/priority:http_priority_queue",
|
||||
"//quic/server/state:server",
|
||||
"//quic/state:quic_priority_queue",
|
||||
"//quic/state:quic_state_machine",
|
||||
|
@ -12,27 +12,30 @@
|
||||
|
||||
using namespace std;
|
||||
using namespace folly;
|
||||
using namespace quic;
|
||||
|
||||
static inline uint8_t findNonemptyLevel(quic::PriorityQueue& pq) {
|
||||
static inline uint8_t findNonemptyLevel(deprecated::PriorityQueue& pq) {
|
||||
for (auto i = 0; i < 16; i++) {
|
||||
quic::Priority pri(i / 2, i % 2);
|
||||
if (!pq.levels[quic::PriorityQueue::priority2index(pri)].empty()) {
|
||||
deprecated::Priority pri(i / 2, i % 2);
|
||||
if (!pq.levels[deprecated::PriorityQueue::priority2index(pri)].empty()) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return 16;
|
||||
}
|
||||
|
||||
static inline void
|
||||
insert(quic::PriorityQueue& pq, size_t numConcurrentStreams, bool incremental) {
|
||||
static inline void insert(
|
||||
deprecated::PriorityQueue& pq,
|
||||
size_t numConcurrentStreams,
|
||||
bool incremental) {
|
||||
// insert streams at various priorities
|
||||
for (size_t i = 0; i < numConcurrentStreams; i++) {
|
||||
pq.insertOrUpdate(i, quic::Priority(i % 8, incremental));
|
||||
pq.insertOrUpdate(i, deprecated::Priority(i % 8, incremental));
|
||||
}
|
||||
}
|
||||
|
||||
static inline void processQueueIncremental(
|
||||
quic::PriorityQueue& pq,
|
||||
deprecated::PriorityQueue& pq,
|
||||
size_t numConcurrentStreams,
|
||||
size_t packetsPerStream,
|
||||
uint8_t shift) {
|
||||
@ -59,7 +62,7 @@ static inline void processQueueIncremental(
|
||||
}
|
||||
|
||||
static inline void processQueueSequential(
|
||||
quic::PriorityQueue& pq,
|
||||
deprecated::PriorityQueue& pq,
|
||||
size_t numConcurrentStreams,
|
||||
size_t packetsPerStream) {
|
||||
CHECK_GT(packetsPerStream, 0);
|
||||
@ -78,7 +81,7 @@ static inline void processQueueSequential(
|
||||
static inline void benchmarkPriority(
|
||||
size_t numConcurrentStreams,
|
||||
bool incremental) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
insert(pq, numConcurrentStreams, incremental);
|
||||
|
||||
size_t packetsPerStream = 4;
|
||||
@ -123,7 +126,7 @@ BENCHMARK(incremental8, n) {
|
||||
BENCHMARK(insertSequential, n) {
|
||||
// insert streams at various priorities
|
||||
for (size_t j = 0; j < n; j++) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
insert(pq, 100, false);
|
||||
pq.clear();
|
||||
}
|
||||
@ -132,7 +135,7 @@ BENCHMARK(insertSequential, n) {
|
||||
BENCHMARK(insertIncremental, n) {
|
||||
// insert streams at various priorities
|
||||
for (size_t j = 0; j < n; j++) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
insert(pq, 100, true);
|
||||
pq.clear();
|
||||
}
|
||||
@ -142,7 +145,7 @@ BENCHMARK(processSequential, n) {
|
||||
// insert streams at various priorities
|
||||
size_t nStreams = 96;
|
||||
for (size_t j = 0; j < n; j++) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
BENCHMARK_SUSPEND {
|
||||
insert(pq, nStreams, false);
|
||||
}
|
||||
@ -154,7 +157,7 @@ BENCHMARK(processIncremental, n) {
|
||||
// insert streams at various priorities
|
||||
size_t nStreams = 96;
|
||||
for (size_t j = 0; j < n; j++) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
BENCHMARK_SUSPEND {
|
||||
insert(pq, nStreams, true);
|
||||
}
|
||||
@ -166,7 +169,7 @@ BENCHMARK(eraseSequential, n) {
|
||||
// insert streams at various priorities
|
||||
size_t nStreams = 96;
|
||||
for (size_t j = 0; j < n; j++) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
BENCHMARK_SUSPEND {
|
||||
insert(pq, nStreams, false);
|
||||
}
|
||||
@ -183,7 +186,7 @@ BENCHMARK(eraseIncremental, n) {
|
||||
// insert streams at various priorities
|
||||
size_t nStreams = 96;
|
||||
for (size_t j = 0; j < n; j++) {
|
||||
quic::PriorityQueue pq;
|
||||
deprecated::PriorityQueue pq;
|
||||
BENCHMARK_SUSPEND {
|
||||
insert(pq, nStreams, true);
|
||||
}
|
||||
|
@ -12,6 +12,8 @@
|
||||
|
||||
namespace quic::test {
|
||||
|
||||
using namespace quic::deprecated;
|
||||
|
||||
class QuicPriorityQueueTest : public testing::Test {
|
||||
public:
|
||||
PriorityQueue queue_;
|
||||
|
@ -9,8 +9,8 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
|
||||
#include <quic/priority/HTTPPriorityQueue.h>
|
||||
#include <quic/server/state/ServerStateMachine.h>
|
||||
#include <quic/state/QuicPriorityQueue.h>
|
||||
#include <quic/state/QuicStreamManager.h>
|
||||
#include <quic/state/QuicStreamUtilities.h>
|
||||
#include <quic/state/stream/StreamStateFunctions.h>
|
||||
@ -75,17 +75,17 @@ TEST_P(QuicStreamManagerTest, SkipRedundantPriorityUpdate) {
|
||||
ASSERT_TRUE(streamResult.hasValue());
|
||||
auto* stream = streamResult.value();
|
||||
auto streamId = stream->id;
|
||||
Priority currentPriority = stream->priority;
|
||||
HTTPPriorityQueue::Priority currentPriority(stream->priority);
|
||||
EXPECT_TRUE(manager.setStreamPriority(
|
||||
streamId,
|
||||
Priority(
|
||||
(currentPriority.level + 1) % (kDefaultMaxPriority + 1),
|
||||
!currentPriority.incremental)));
|
||||
HTTPPriorityQueue::Priority(
|
||||
(currentPriority->urgency + 1) % (kDefaultMaxPriority + 1),
|
||||
!currentPriority->incremental)));
|
||||
EXPECT_FALSE(manager.setStreamPriority(
|
||||
streamId,
|
||||
Priority(
|
||||
(currentPriority.level + 1) % (kDefaultMaxPriority + 1),
|
||||
!currentPriority.incremental)));
|
||||
HTTPPriorityQueue::Priority(
|
||||
(currentPriority->urgency + 1) % (kDefaultMaxPriority + 1),
|
||||
!currentPriority->incremental)));
|
||||
}
|
||||
|
||||
TEST_P(QuicStreamManagerTest, TestAppIdleCreateBidiStream) {
|
||||
|
Reference in New Issue
Block a user