1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Prioritize read callbacks for unidirectional/control streams

Summary: Prioritize read callbacks for unidirectional/control streams optionally.

Reviewed By: mjoras

Differential Revision: D64439116

fbshipit-source-id: b4fe424db99921558d4672d9b21e992369072ca9
This commit is contained in:
Konstantin Tsoy
2024-10-16 12:10:44 -07:00
committed by Facebook GitHub Bot
parent cbd22e95e2
commit 013bf7cc34
6 changed files with 204 additions and 14 deletions

View File

@@ -472,19 +472,26 @@ void QuicTransportBaseLite::updateReadLooper() {
readLooper_->stop(); readLooper_->stop();
return; return;
} }
auto matcherFn = [&readCallbacks = readCallbacks_](StreamId s) {
auto readCb = readCallbacks.find(s);
if (readCb == readCallbacks.end()) {
return false;
}
// TODO: if the stream has an error and it is also paused we should
// still return an error
return readCb->second.readCb && readCb->second.resumed;
};
auto iter = std::find_if( auto iter = std::find_if(
conn_->streamManager->readableStreams().begin(), conn_->streamManager->readableStreams().begin(),
conn_->streamManager->readableStreams().end(), conn_->streamManager->readableStreams().end(),
[&readCallbacks = readCallbacks_](StreamId s) { matcherFn);
auto readCb = readCallbacks.find(s); auto unidirIter = std::find_if(
if (readCb == readCallbacks.end()) { conn_->streamManager->readableUnidirectionalStreams().begin(),
return false; conn_->streamManager->readableUnidirectionalStreams().end(),
} matcherFn);
// TODO: if the stream has an error and it is also paused we should
// still return an error
return readCb->second.readCb && readCb->second.resumed;
});
if (iter != conn_->streamManager->readableStreams().end() || if (iter != conn_->streamManager->readableStreams().end() ||
unidirIter !=
conn_->streamManager->readableUnidirectionalStreams().end() ||
!conn_->datagramState.readBuffer.empty()) { !conn_->datagramState.readBuffer.empty()) {
VLOG(10) << "Scheduling read looper " << *this; VLOG(10) << "Scheduling read looper " << *this;
readLooper_->run(); readLooper_->run();
@@ -1307,15 +1314,30 @@ void QuicTransportBaseLite::invokeReadDataAndCallbacks() {
}; };
// Need a copy since the set can change during callbacks. // Need a copy since the set can change during callbacks.
std::vector<StreamId> readableStreamsCopy; std::vector<StreamId> readableStreamsCopy;
const auto& readableStreams = self->conn_->streamManager->readableStreams(); const auto& readableStreams = self->conn_->streamManager->readableStreams();
readableStreamsCopy.reserve(readableStreams.size()); const auto& readableUnidirectionalStreams =
self->conn_->streamManager->readableUnidirectionalStreams();
readableStreamsCopy.reserve(
readableStreams.size() + readableUnidirectionalStreams.size());
if (self->conn_->transportSettings.unidirectionalStreamsReadCallbacksFirst) {
std::copy(
readableUnidirectionalStreams.begin(),
readableUnidirectionalStreams.end(),
std::back_inserter(readableStreamsCopy));
}
std::copy( std::copy(
readableStreams.begin(), readableStreams.begin(),
readableStreams.end(), readableStreams.end(),
std::back_inserter(readableStreamsCopy)); std::back_inserter(readableStreamsCopy));
if (self->conn_->transportSettings.orderedReadCallbacks) { if (self->conn_->transportSettings.orderedReadCallbacks) {
std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end()); std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end());
} }
for (StreamId streamId : readableStreamsCopy) { for (StreamId streamId : readableStreamsCopy) {
auto callback = self->readCallbacks_.find(streamId); auto callback = self->readCallbacks_.find(streamId);
if (callback == self->readCallbacks_.end()) { if (callback == self->readCallbacks_.end()) {
@@ -1325,7 +1347,14 @@ void QuicTransportBaseLite::invokeReadDataAndCallbacks() {
auto readCb = callback->second.readCb; auto readCb = callback->second.readCb;
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
if (readCb && stream->streamReadError) { if (readCb && stream->streamReadError) {
self->conn_->streamManager->readableStreams().erase(streamId); if (self->conn_->transportSettings
.unidirectionalStreamsReadCallbacksFirst &&
isUnidirectionalStream(streamId)) {
self->conn_->streamManager->readableUnidirectionalStreams().erase(
streamId);
} else {
self->conn_->streamManager->readableStreams().erase(streamId);
}
readCallbacks_.erase(callback); readCallbacks_.erase(callback);
// if there is an error on the stream - it's not readable anymore, so // if there is an error on the stream - it's not readable anymore, so
// we cannot peek into it as well. // we cannot peek into it as well.

View File

@@ -960,6 +960,52 @@ TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailable) {
transport.reset(); transport.reset();
} }
TEST_P(
QuicTransportImplTestBase,
ReadCallbackDataAvailableWithUnidirPrioritized) {
InSequence seq;
auto transportSettings = transport->getTransportSettings();
transportSettings.unidirectionalStreamsReadCallbacksFirst = true;
transport->setTransportSettings(transportSettings);
auto& streamManager = *transport->transportConn->streamManager;
auto nextPeerUniStream =
streamManager.nextAcceptablePeerUnidirectionalStreamId();
EXPECT_TRUE(nextPeerUniStream.has_value());
StreamId qpackStream = streamManager.getStream(*nextPeerUniStream)->id;
auto requestStream = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> requestStreamCb;
NiceMock<MockReadCallback> qpackStreamCb;
transport->setReadCallback(requestStream, &requestStreamCb);
transport->setReadCallback(qpackStream, &qpackStreamCb);
transport->addDataToStream(
qpackStream,
StreamBuffer(
folly::IOBuf::copyBuffer(
"and i'm qpack data i will block you no tomorrow"),
0));
transport->addDataToStream(
requestStream, StreamBuffer(folly::IOBuf::copyBuffer("i'm headers"), 0));
bool qpackCbCalled = false;
EXPECT_CALL(qpackStreamCb, readAvailable(qpackStream))
.WillOnce(Invoke([&](StreamId) {
EXPECT_FALSE(qpackCbCalled);
qpackCbCalled = true;
}));
EXPECT_CALL(requestStreamCb, readAvailable(requestStream))
.WillOnce(Invoke([&](StreamId) { EXPECT_TRUE(qpackCbCalled); }));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailableNoReap) { TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailableNoReap) {
auto stream1 = transport->createBidirectionalStream().value(); auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value();

View File

@@ -555,7 +555,12 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
} }
VLOG(10) << "Removing closed stream=" << streamId; VLOG(10) << "Removing closed stream=" << streamId;
DCHECK(it->second.inTerminalStates()); DCHECK(it->second.inTerminalStates());
readableStreams_.erase(streamId); if (conn_.transportSettings.unidirectionalStreamsReadCallbacksFirst &&
isUnidirectionalStream(streamId)) {
unidirectionalReadableStreams_.erase(streamId);
} else {
readableStreams_.erase(streamId);
}
peekableStreams_.erase(streamId); peekableStreams_.erase(streamId);
removeWritable(it->second); removeWritable(it->second);
blockedStreams_.erase(streamId); blockedStreams_.erase(streamId);
@@ -624,12 +629,31 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
notifyStreamPriorityChanges(); notifyStreamPriorityChanges();
} }
void QuicStreamManager::addToReadableStreams(const QuicStreamState& stream) {
if (conn_.transportSettings.unidirectionalStreamsReadCallbacksFirst &&
isUnidirectionalStream(stream.id)) {
unidirectionalReadableStreams_.emplace(stream.id);
} else {
readableStreams_.emplace(stream.id);
}
}
void QuicStreamManager::removeFromReadableStreams(
const QuicStreamState& stream) {
if (conn_.transportSettings.unidirectionalStreamsReadCallbacksFirst &&
isUnidirectionalStream(stream.id)) {
unidirectionalReadableStreams_.erase(stream.id);
} else {
readableStreams_.erase(stream.id);
}
}
void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
updateHolBlockedTime(stream); updateHolBlockedTime(stream);
if (stream.hasReadableData() || stream.streamReadError.has_value()) { if (stream.hasReadableData() || stream.streamReadError.has_value()) {
readableStreams_.emplace(stream.id); addToReadableStreams(stream);
} else { } else {
readableStreams_.erase(stream.id); removeFromReadableStreams(stream);
} }
} }

View File

@@ -203,6 +203,8 @@ class QuicStreamManager {
lossStreams_ = std::move(other.lossStreams_); lossStreams_ = std::move(other.lossStreams_);
lossDSRStreams_ = std::move(other.lossDSRStreams_); lossDSRStreams_ = std::move(other.lossDSRStreams_);
readableStreams_ = std::move(other.readableStreams_); readableStreams_ = std::move(other.readableStreams_);
unidirectionalReadableStreams_ =
std::move(other.unidirectionalReadableStreams_);
peekableStreams_ = std::move(other.peekableStreams_); peekableStreams_ = std::move(other.peekableStreams_);
writeQueue_ = std::move(other.writeQueue_); writeQueue_ = std::move(other.writeQueue_);
controlWriteQueue_ = std::move(other.controlWriteQueue_); controlWriteQueue_ = std::move(other.controlWriteQueue_);
@@ -799,6 +801,10 @@ class QuicStreamManager {
return readableStreams_; return readableStreams_;
} }
auto& readableUnidirectionalStreams() {
return unidirectionalReadableStreams_;
}
// TODO figure out a better interface here. // TODO figure out a better interface here.
/* /*
* Returns a mutable reference to the underlying peekable streams container. * Returns a mutable reference to the underlying peekable streams container.
@@ -1001,6 +1007,7 @@ class QuicStreamManager {
deliverableStreams_.clear(); deliverableStreams_.clear();
txStreams_.clear(); txStreams_.clear();
readableStreams_.clear(); readableStreams_.clear();
unidirectionalReadableStreams_.clear();
peekableStreams_.clear(); peekableStreams_.clear();
flowControlUpdated_.clear(); flowControlUpdated_.clear();
} }
@@ -1082,6 +1089,9 @@ class QuicStreamManager {
StreamGroupId& groupId, StreamGroupId& groupId,
StreamIdSet& streamGroups); StreamIdSet& streamGroups);
void addToReadableStreams(const QuicStreamState& stream);
void removeFromReadableStreams(const QuicStreamState& stream);
QuicConnectionStateBase& conn_; QuicConnectionStateBase& conn_;
QuicNodeType nodeType_; QuicNodeType nodeType_;
@@ -1202,6 +1212,12 @@ class QuicStreamManager {
// Set of streams that have pending reads // Set of streams that have pending reads
folly::F14FastSet<StreamId> readableStreams_; folly::F14FastSet<StreamId> readableStreams_;
// Set of unidirectional streams that have pending reads.
// Used separately from readableStreams_ when
// unidirectionalStreamsReadCallbacksFirst = true to prioritize unidirectional
// streams read callbacks.
folly::F14FastSet<StreamId> unidirectionalReadableStreams_;
// Set of streams that have pending peeks // Set of streams that have pending peeks
folly::F14FastSet<StreamId> peekableStreams_; folly::F14FastSet<StreamId> peekableStreams_;

View File

@@ -414,6 +414,9 @@ struct TransportSettings {
bool alwaysPtoMultiple{false}; bool alwaysPtoMultiple{false};
// Use a reordering threshold heuristic of inflight / 2. // Use a reordering threshold heuristic of inflight / 2.
bool useInflightReorderingThreshold{false}; bool useInflightReorderingThreshold{false};
// Raise read callbacks for all unidirectional streams first on data
// reception.
bool unidirectionalStreamsReadCallbacksFirst{false};
}; };
} // namespace quic } // namespace quic

View File

@@ -648,6 +648,78 @@ TEST_P(QuicStreamManagerTest, TestClearActionable) {
EXPECT_TRUE(manager.peekableStreams().empty()); EXPECT_TRUE(manager.peekableStreams().empty());
} }
TEST_P(QuicStreamManagerTest, TestUnidirectionalStreamsSeparateSet) {
conn.transportSettings.unidirectionalStreamsReadCallbacksFirst = true;
auto& manager = *conn.streamManager;
StreamId id = 1;
auto stream = manager.createNextUnidirectionalStream().value();
stream->readBuffer.emplace_back(folly::IOBuf::copyBuffer("blah blah"), 0);
manager.queueFlowControlUpdated(id);
manager.addDeliverable(id);
manager.updateReadableStreams(*stream);
manager.updatePeekableStreams(*stream);
EXPECT_TRUE(manager.flowControlUpdatedContains(id));
EXPECT_TRUE(manager.deliverableContains(id));
EXPECT_TRUE(manager.readableStreams().empty());
EXPECT_FALSE(manager.readableUnidirectionalStreams().empty());
EXPECT_FALSE(manager.peekableStreams().empty());
manager.clearActionable();
EXPECT_FALSE(manager.flowControlUpdatedContains(id));
EXPECT_FALSE(manager.deliverableContains(id));
EXPECT_TRUE(manager.readableStreams().empty());
EXPECT_TRUE(manager.peekableStreams().empty());
}
TEST_P(
QuicStreamManagerTest,
TestUnidirectionalStreamsSeparateSetRemoveStream) {
conn.transportSettings.unidirectionalStreamsReadCallbacksFirst = true;
auto& manager = *conn.streamManager;
auto stream = manager.createNextUnidirectionalStream().value();
stream->readBuffer.emplace_back(folly::IOBuf::copyBuffer("blah blah"), 0);
manager.updateReadableStreams(*stream);
manager.updatePeekableStreams(*stream);
EXPECT_TRUE(manager.readableStreams().empty());
EXPECT_FALSE(manager.readableUnidirectionalStreams().empty());
EXPECT_FALSE(manager.peekableStreams().empty());
// Remove data from stream.
stream->readBuffer.clear();
manager.updateReadableStreams(*stream);
EXPECT_TRUE(manager.readableStreams().empty());
EXPECT_TRUE(manager.readableUnidirectionalStreams().empty());
}
TEST_P(QuicStreamManagerTest, TestUnidirectionalStreamsSeparateSetTwoStreams) {
conn.transportSettings.unidirectionalStreamsReadCallbacksFirst = true;
auto& manager = *conn.streamManager;
auto stream = manager.createNextBidirectionalStream().value();
stream->readBuffer.emplace_back(
folly::IOBuf::copyBuffer("and i'm headers"), 0);
manager.updateReadableStreams(*stream);
manager.updatePeekableStreams(*stream);
EXPECT_EQ(manager.readableStreams().size(), 1);
EXPECT_EQ(manager.readableUnidirectionalStreams().size(), 0);
auto stream2 = manager.createNextUnidirectionalStream().value();
stream2->readBuffer.emplace_back(
folly::IOBuf::copyBuffer("look at me, i am qpack data"), 0);
manager.updateReadableStreams(*stream2);
manager.updatePeekableStreams(*stream2);
EXPECT_EQ(manager.readableStreams().size(), 1);
EXPECT_EQ(manager.readableUnidirectionalStreams().size(), 1);
}
TEST_P(QuicStreamManagerTest, WriteBufferMeta) { TEST_P(QuicStreamManagerTest, WriteBufferMeta) {
auto& manager = *conn.streamManager; auto& manager = *conn.streamManager;
auto stream = manager.createNextUnidirectionalStream().value(); auto stream = manager.createNextUnidirectionalStream().value();