1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-07-30 14:43:05 +03:00

refactor setStreamPriority to accept Priority struct

Summary:
## Context
Context: see summary for D43854603.

## In this diff
In this diff, make way for easily adding new elements to Priority struct.

Reviewed By: afrind

Differential Revision: D43882907

fbshipit-source-id: f74f6ec41162a970dcd666bfa5192676f968d740
This commit is contained in:
Paul Farcasanu
2023-03-28 16:00:38 -07:00
committed by Facebook GitHub Bot
parent a1a5617ac4
commit e56d4bd7f6
10 changed files with 40 additions and 35 deletions

View File

@ -567,8 +567,20 @@ class QuicSocket {
* Set stream priority. * Set stream priority.
* level: can only be in [0, 7]. * level: can only be in [0, 7].
*/ */
virtual folly::Expected<folly::Unit, LocalErrorCode> folly::Expected<folly::Unit, LocalErrorCode>
setStreamPriority(StreamId id, PriorityLevel level, bool incremental) = 0; setStreamPriority(StreamId id, PriorityLevel level, bool incremental) {
return setStreamPriority(id, Priority(level, incremental));
}
/**
* Set stream priority.
* level: can only be in [0, 7].
* incremental: true/false
* orderId: uint64
*/
virtual folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
StreamId id,
Priority priority) = 0;
/** /**
* Get stream priority. * Get stream priority.

View File

@ -3311,14 +3311,11 @@ const TransportSettings& QuicTransportBase::getTransportSettings() const {
} }
folly::Expected<folly::Unit, LocalErrorCode> folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamPriority( QuicTransportBase::setStreamPriority(StreamId id, Priority priority) {
StreamId id,
PriorityLevel level,
bool incremental) {
if (closeState_ != CloseState::OPEN) { if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
} }
if (level > kDefaultMaxPriority) { if (priority.level > kDefaultMaxPriority) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
} }
if (!conn_->streamManager->streamExists(id)) { if (!conn_->streamManager->streamExists(id)) {
@ -3327,10 +3324,9 @@ QuicTransportBase::setStreamPriority(
} }
// It's not an error to prioritize a stream after it's sent its FIN - this // It's not an error to prioritize a stream after it's sent its FIN - this
// can reprioritize retransmissions. // can reprioritize retransmissions.
bool updated = bool updated = conn_->streamManager->setStreamPriority(id, priority);
conn_->streamManager->setStreamPriority(id, level, incremental);
if (updated && conn_->qLogger) { if (updated && conn_->qLogger) {
conn_->qLogger->addPriorityUpdate(id, level, incremental); conn_->qLogger->addPriorityUpdate(id, priority.level, priority.incremental);
} }
return folly::unit; return folly::unit;
} }

View File

@ -337,8 +337,7 @@ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver {
folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority( folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
StreamId id, StreamId id,
PriorityLevel level, Priority priority) override;
bool incremental) override;
folly::Expected<Priority, LocalErrorCode> getStreamPriority( folly::Expected<Priority, LocalErrorCode> getStreamPriority(
StreamId id) override; StreamId id) override;

View File

@ -126,7 +126,7 @@ class MockQuicSocket : public QuicSocket {
MOCK_METHOD( MOCK_METHOD(
(folly::Expected<folly::Unit, LocalErrorCode>), (folly::Expected<folly::Unit, LocalErrorCode>),
setStreamPriority, setStreamPriority,
(StreamId, uint8_t, bool)); (StreamId, Priority));
MOCK_METHOD( MOCK_METHOD(
(folly::Expected<Priority, LocalErrorCode>), (folly::Expected<Priority, LocalErrorCode>),
getStreamPriority, getStreamPriority,

View File

@ -1670,12 +1670,12 @@ TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) {
lowPriStream->lossBuffer.push_back( lowPriStream->lossBuffer.push_back(
StreamBuffer(folly::IOBuf::copyBuffer("Onegin"), 0, false)); StreamBuffer(folly::IOBuf::copyBuffer("Onegin"), 0, false));
conn.streamManager->updateWritableStreams(*lowPriStream); conn.streamManager->updateWritableStreams(*lowPriStream);
conn.streamManager->setStreamPriority(lowPriStream->id, 5, false); conn.streamManager->setStreamPriority(lowPriStream->id, Priority(5, false));
auto inBuf = buildRandomInputData(conn.udpSendPacketLen * 10); auto inBuf = buildRandomInputData(conn.udpSendPacketLen * 10);
writeDataToQuicStream(*highPriStream, inBuf->clone(), false); writeDataToQuicStream(*highPriStream, inBuf->clone(), false);
conn.streamManager->updateWritableStreams(*highPriStream); conn.streamManager->updateWritableStreams(*highPriStream);
conn.streamManager->setStreamPriority(highPriStream->id, 0, false); conn.streamManager->setStreamPriority(highPriStream->id, Priority(0, false));
StreamFrameScheduler scheduler(conn); StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader( ShortHeader shortHeader(
@ -1816,7 +1816,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) {
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000; conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 1000;
auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id; auto streamId = (*conn.streamManager->createNextBidirectionalStream())->id;
conn.streamManager->setStreamPriority(streamId, 0, false); conn.streamManager->setStreamPriority(streamId, Priority(0, false));
auto stream = conn.streamManager->findStream(streamId); auto stream = conn.streamManager->findStream(streamId);
auto data = buildRandomInputData(1000); auto data = buildRandomInputData(1000);
writeDataToQuicStream(*stream, std::move(data), true); writeDataToQuicStream(*stream, std::move(data), true);

View File

@ -4217,7 +4217,7 @@ TEST_P(QuicTransportImplTestBase, BackgroundModeChangeWithStreamChanges) {
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_)) EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(_))
.Times(0); // Backgound params not set .Times(0); // Backgound params not set
auto stream = manager.createNextUnidirectionalStream().value(); auto stream = manager.createNextUnidirectionalStream().value();
manager.setStreamPriority(stream->id, 1, false); manager.setStreamPriority(stream->id, Priority(1, false));
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5)) EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(0.5))
.Times(1); // On setting the background params .Times(1); // On setting the background params
@ -4236,7 +4236,7 @@ TEST_P(QuicTransportImplTestBase, BackgroundModeChangeWithStreamChanges) {
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0)) EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
.Times(1); // On increasing the priority of one of the streams .Times(1); // On increasing the priority of one of the streams
manager.setStreamPriority(stream3id, 0, false); manager.setStreamPriority(stream3id, Priority(0, false));
EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0)) EXPECT_CALL(*rawCongestionController, setBandwidthUtilizationFactor(1.0))
.Times(1); // a new lower priority stream does not affect the utlization .Times(1); // a new lower priority stream does not affect the utlization

View File

@ -1629,7 +1629,7 @@ TEST_F(QuicTransportTest, WriteSmall) {
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false); transport_->writeChain(stream, buf->clone(), false);
transport_->setStreamPriority(stream, 0, false); transport_->setStreamPriority(stream, Priority(0, false));
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
verifyCorrectness(conn, 0, stream, *buf); verifyCorrectness(conn, 0, stream, *buf);
@ -2761,7 +2761,7 @@ TEST_F(QuicTransportTest, NonWritableStreamAPI) {
// Check that write-side APIs return an error // Check that write-side APIs return an error
auto res2 = transport_->notifyPendingWriteOnStream(streamId, &writeCallback_); auto res2 = transport_->notifyPendingWriteOnStream(streamId, &writeCallback_);
EXPECT_EQ(LocalErrorCode::STREAM_CLOSED, res2.error()); EXPECT_EQ(LocalErrorCode::STREAM_CLOSED, res2.error());
auto res3 = transport_->setStreamPriority(streamId, 0, false); auto res3 = transport_->setStreamPriority(streamId, Priority(0, false));
EXPECT_FALSE(res3.hasError()); EXPECT_FALSE(res3.hasError());
} }
@ -4744,7 +4744,7 @@ TEST_F(QuicTransportTest, GetStreamPacketsTxedMultiplePackets) {
TEST_F(QuicTransportTest, PrioritySetAndGet) { TEST_F(QuicTransportTest, PrioritySetAndGet) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
EXPECT_EQ(kDefaultPriority, transport_->getStreamPriority(stream).value()); EXPECT_EQ(kDefaultPriority, transport_->getStreamPriority(stream).value());
transport_->setStreamPriority(stream, 0, false); transport_->setStreamPriority(stream, Priority(0, false));
EXPECT_EQ(Priority(0, false), transport_->getStreamPriority(stream).value()); EXPECT_EQ(Priority(0, false), transport_->getStreamPriority(stream).value());
auto nonExistStreamPri = transport_->getStreamPriority(stream + 4); auto nonExistStreamPri = transport_->getStreamPriority(stream + 4);
EXPECT_TRUE(nonExistStreamPri.hasError()); EXPECT_TRUE(nonExistStreamPri.hasError());

View File

@ -228,13 +228,9 @@ bool QuicStreamManager::consumeMaxLocalUnidirectionalStreamIdIncreased() {
return res; return res;
} }
bool QuicStreamManager::setStreamPriority( bool QuicStreamManager::setStreamPriority(StreamId id, Priority newPriority) {
StreamId id,
PriorityLevel level,
bool incremental) {
auto stream = findStream(id); auto stream = findStream(id);
if (stream) { if (stream) {
Priority newPriority(level, incremental);
if (stream->priority == newPriority) { if (stream->priority == newPriority) {
return false; return false;
} }
@ -250,7 +246,7 @@ bool QuicStreamManager::setStreamPriority(
} }
notifyStreamPriorityChanges(); notifyStreamPriorityChanges();
} }
// If this stream is already in the writable or loss queus, update the // If this stream is already in the writable or loss queues, update the
// priority there. // priority there.
writableStreams_.updateIfExist(id, stream->priority); writableStreams_.updateIfExist(id, stream->priority);
writableDSRStreams_.updateIfExist(id, stream->priority); writableDSRStreams_.updateIfExist(id, stream->priority);

View File

@ -403,7 +403,7 @@ class QuicStreamManager {
* passed in values are different from current priority. Return true if * passed in values are different from current priority. Return true if
* stream priority is update, false otherwise. * stream priority is update, false otherwise.
*/ */
bool setStreamPriority(StreamId id, PriorityLevel level, bool incremental); bool setStreamPriority(StreamId id, Priority priority);
// TODO figure out a better interface here. // TODO figure out a better interface here.
/* /*

View File

@ -66,12 +66,14 @@ TEST_P(QuicStreamManagerTest, SkipRedundantPriorityUpdate) {
Priority currentPriority = stream.value()->priority; Priority currentPriority = stream.value()->priority;
EXPECT_TRUE(manager.setStreamPriority( EXPECT_TRUE(manager.setStreamPriority(
streamId, streamId,
(currentPriority.level + 1) % (kDefaultMaxPriority + 1), Priority(
!currentPriority.incremental)); (currentPriority.level + 1) % (kDefaultMaxPriority + 1),
!currentPriority.incremental)));
EXPECT_FALSE(manager.setStreamPriority( EXPECT_FALSE(manager.setStreamPriority(
streamId, streamId,
(currentPriority.level + 1) % (kDefaultMaxPriority + 1), Priority(
!currentPriority.incremental)); (currentPriority.level + 1) % (kDefaultMaxPriority + 1),
!currentPriority.incremental)));
} }
TEST_P(QuicStreamManagerTest, TestAppIdleCreateBidiStream) { TEST_P(QuicStreamManagerTest, TestAppIdleCreateBidiStream) {
@ -672,7 +674,7 @@ TEST_P(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) {
auto stream = manager.createNextUnidirectionalStream().value(); auto stream = manager.createNextUnidirectionalStream().value();
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level); EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level);
manager.setStreamPriority(stream->id, 1, false); manager.setStreamPriority(stream->id, Priority(1, false));
EXPECT_EQ(manager.getHighestPriorityLevel(), 1); EXPECT_EQ(manager.getHighestPriorityLevel(), 1);
EXPECT_CALL(mObserver, onStreamPrioritiesChange()) EXPECT_CALL(mObserver, onStreamPrioritiesChange())
@ -692,7 +694,7 @@ TEST_P(QuicStreamManagerTest, NotifyOnStreamPriorityChanges) {
EXPECT_CALL(mObserver, onStreamPrioritiesChange()) EXPECT_CALL(mObserver, onStreamPrioritiesChange())
.Times(1); // On increasing the priority of one of the streams .Times(1); // On increasing the priority of one of the streams
manager.setStreamPriority(stream3->id, 0, false); manager.setStreamPriority(stream3->id, Priority(0, false));
EXPECT_EQ(manager.getHighestPriorityLevel(), 0); EXPECT_EQ(manager.getHighestPriorityLevel(), 0);
EXPECT_CALL(mObserver, onStreamPrioritiesChange()) EXPECT_CALL(mObserver, onStreamPrioritiesChange())
@ -732,7 +734,7 @@ TEST_P(QuicStreamManagerTest, StreamPriorityExcludesControl) {
auto stream = manager.createNextUnidirectionalStream().value(); auto stream = manager.createNextUnidirectionalStream().value();
EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level); EXPECT_EQ(manager.getHighestPriorityLevel(), kDefaultPriority.level);
manager.setStreamPriority(stream->id, 1, false); manager.setStreamPriority(stream->id, Priority(1, false));
EXPECT_EQ(manager.getHighestPriorityLevel(), 1); EXPECT_EQ(manager.getHighestPriorityLevel(), 1);
manager.setStreamAsControl(*stream); manager.setStreamAsControl(*stream);