diff --git a/quic/api/QuicTransportBaseLite.cpp b/quic/api/QuicTransportBaseLite.cpp index b827db710..e009947a9 100644 --- a/quic/api/QuicTransportBaseLite.cpp +++ b/quic/api/QuicTransportBaseLite.cpp @@ -472,19 +472,26 @@ void QuicTransportBaseLite::updateReadLooper() { readLooper_->stop(); return; } + auto matcherFn = [&readCallbacks = readCallbacks_](StreamId s) { + auto readCb = readCallbacks.find(s); + if (readCb == readCallbacks.end()) { + return false; + } + // TODO: if the stream has an error and it is also paused we should + // still return an error + return readCb->second.readCb && readCb->second.resumed; + }; auto iter = std::find_if( conn_->streamManager->readableStreams().begin(), conn_->streamManager->readableStreams().end(), - [&readCallbacks = readCallbacks_](StreamId s) { - auto readCb = readCallbacks.find(s); - if (readCb == readCallbacks.end()) { - return false; - } - // TODO: if the stream has an error and it is also paused we should - // still return an error - return readCb->second.readCb && readCb->second.resumed; - }); + matcherFn); + auto unidirIter = std::find_if( + conn_->streamManager->readableUnidirectionalStreams().begin(), + conn_->streamManager->readableUnidirectionalStreams().end(), + matcherFn); if (iter != conn_->streamManager->readableStreams().end() || + unidirIter != + conn_->streamManager->readableUnidirectionalStreams().end() || !conn_->datagramState.readBuffer.empty()) { VLOG(10) << "Scheduling read looper " << *this; readLooper_->run(); @@ -1307,15 +1314,30 @@ void QuicTransportBaseLite::invokeReadDataAndCallbacks() { }; // Need a copy since the set can change during callbacks. std::vector readableStreamsCopy; + const auto& readableStreams = self->conn_->streamManager->readableStreams(); - readableStreamsCopy.reserve(readableStreams.size()); + const auto& readableUnidirectionalStreams = + self->conn_->streamManager->readableUnidirectionalStreams(); + + readableStreamsCopy.reserve( + readableStreams.size() + readableUnidirectionalStreams.size()); + + if (self->conn_->transportSettings.unidirectionalStreamsReadCallbacksFirst) { + std::copy( + readableUnidirectionalStreams.begin(), + readableUnidirectionalStreams.end(), + std::back_inserter(readableStreamsCopy)); + } + std::copy( readableStreams.begin(), readableStreams.end(), std::back_inserter(readableStreamsCopy)); + if (self->conn_->transportSettings.orderedReadCallbacks) { std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end()); } + for (StreamId streamId : readableStreamsCopy) { auto callback = self->readCallbacks_.find(streamId); if (callback == self->readCallbacks_.end()) { @@ -1325,7 +1347,14 @@ void QuicTransportBaseLite::invokeReadDataAndCallbacks() { auto readCb = callback->second.readCb; auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); if (readCb && stream->streamReadError) { - self->conn_->streamManager->readableStreams().erase(streamId); + if (self->conn_->transportSettings + .unidirectionalStreamsReadCallbacksFirst && + isUnidirectionalStream(streamId)) { + self->conn_->streamManager->readableUnidirectionalStreams().erase( + streamId); + } else { + self->conn_->streamManager->readableStreams().erase(streamId); + } readCallbacks_.erase(callback); // if there is an error on the stream - it's not readable anymore, so // we cannot peek into it as well. diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 7e1ea725d..f13df3951 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -960,6 +960,52 @@ TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailable) { transport.reset(); } +TEST_P( + QuicTransportImplTestBase, + ReadCallbackDataAvailableWithUnidirPrioritized) { + InSequence seq; + + auto transportSettings = transport->getTransportSettings(); + transportSettings.unidirectionalStreamsReadCallbacksFirst = true; + transport->setTransportSettings(transportSettings); + + auto& streamManager = *transport->transportConn->streamManager; + auto nextPeerUniStream = + streamManager.nextAcceptablePeerUnidirectionalStreamId(); + EXPECT_TRUE(nextPeerUniStream.has_value()); + StreamId qpackStream = streamManager.getStream(*nextPeerUniStream)->id; + + auto requestStream = transport->createBidirectionalStream().value(); + + NiceMock requestStreamCb; + NiceMock qpackStreamCb; + + transport->setReadCallback(requestStream, &requestStreamCb); + transport->setReadCallback(qpackStream, &qpackStreamCb); + + transport->addDataToStream( + qpackStream, + StreamBuffer( + folly::IOBuf::copyBuffer( + "and i'm qpack data i will block you no tomorrow"), + 0)); + transport->addDataToStream( + requestStream, StreamBuffer(folly::IOBuf::copyBuffer("i'm headers"), 0)); + + bool qpackCbCalled = false; + + EXPECT_CALL(qpackStreamCb, readAvailable(qpackStream)) + .WillOnce(Invoke([&](StreamId) { + EXPECT_FALSE(qpackCbCalled); + qpackCbCalled = true; + })); + EXPECT_CALL(requestStreamCb, readAvailable(requestStream)) + .WillOnce(Invoke([&](StreamId) { EXPECT_TRUE(qpackCbCalled); })); + transport->driveReadCallbacks(); + + transport.reset(); +} + TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailableNoReap) { auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index e49abadc6..e14acf0e8 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -555,7 +555,12 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { } VLOG(10) << "Removing closed stream=" << streamId; DCHECK(it->second.inTerminalStates()); - readableStreams_.erase(streamId); + if (conn_.transportSettings.unidirectionalStreamsReadCallbacksFirst && + isUnidirectionalStream(streamId)) { + unidirectionalReadableStreams_.erase(streamId); + } else { + readableStreams_.erase(streamId); + } peekableStreams_.erase(streamId); removeWritable(it->second); blockedStreams_.erase(streamId); @@ -624,12 +629,31 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { notifyStreamPriorityChanges(); } +void QuicStreamManager::addToReadableStreams(const QuicStreamState& stream) { + if (conn_.transportSettings.unidirectionalStreamsReadCallbacksFirst && + isUnidirectionalStream(stream.id)) { + unidirectionalReadableStreams_.emplace(stream.id); + } else { + readableStreams_.emplace(stream.id); + } +} + +void QuicStreamManager::removeFromReadableStreams( + const QuicStreamState& stream) { + if (conn_.transportSettings.unidirectionalStreamsReadCallbacksFirst && + isUnidirectionalStream(stream.id)) { + unidirectionalReadableStreams_.erase(stream.id); + } else { + readableStreams_.erase(stream.id); + } +} + void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { updateHolBlockedTime(stream); if (stream.hasReadableData() || stream.streamReadError.has_value()) { - readableStreams_.emplace(stream.id); + addToReadableStreams(stream); } else { - readableStreams_.erase(stream.id); + removeFromReadableStreams(stream); } } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index 0cfb1d8e5..794febd94 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -203,6 +203,8 @@ class QuicStreamManager { lossStreams_ = std::move(other.lossStreams_); lossDSRStreams_ = std::move(other.lossDSRStreams_); readableStreams_ = std::move(other.readableStreams_); + unidirectionalReadableStreams_ = + std::move(other.unidirectionalReadableStreams_); peekableStreams_ = std::move(other.peekableStreams_); writeQueue_ = std::move(other.writeQueue_); controlWriteQueue_ = std::move(other.controlWriteQueue_); @@ -799,6 +801,10 @@ class QuicStreamManager { return readableStreams_; } + auto& readableUnidirectionalStreams() { + return unidirectionalReadableStreams_; + } + // TODO figure out a better interface here. /* * Returns a mutable reference to the underlying peekable streams container. @@ -1001,6 +1007,7 @@ class QuicStreamManager { deliverableStreams_.clear(); txStreams_.clear(); readableStreams_.clear(); + unidirectionalReadableStreams_.clear(); peekableStreams_.clear(); flowControlUpdated_.clear(); } @@ -1082,6 +1089,9 @@ class QuicStreamManager { StreamGroupId& groupId, StreamIdSet& streamGroups); + void addToReadableStreams(const QuicStreamState& stream); + void removeFromReadableStreams(const QuicStreamState& stream); + QuicConnectionStateBase& conn_; QuicNodeType nodeType_; @@ -1202,6 +1212,12 @@ class QuicStreamManager { // Set of streams that have pending reads folly::F14FastSet readableStreams_; + // Set of unidirectional streams that have pending reads. + // Used separately from readableStreams_ when + // unidirectionalStreamsReadCallbacksFirst = true to prioritize unidirectional + // streams read callbacks. + folly::F14FastSet unidirectionalReadableStreams_; + // Set of streams that have pending peeks folly::F14FastSet peekableStreams_; diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index d05bfb4b6..c2124d6cd 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -414,6 +414,9 @@ struct TransportSettings { bool alwaysPtoMultiple{false}; // Use a reordering threshold heuristic of inflight / 2. bool useInflightReorderingThreshold{false}; + // Raise read callbacks for all unidirectional streams first on data + // reception. + bool unidirectionalStreamsReadCallbacksFirst{false}; }; } // namespace quic diff --git a/quic/state/test/QuicStreamManagerTest.cpp b/quic/state/test/QuicStreamManagerTest.cpp index f94f99293..918c270e1 100644 --- a/quic/state/test/QuicStreamManagerTest.cpp +++ b/quic/state/test/QuicStreamManagerTest.cpp @@ -648,6 +648,78 @@ TEST_P(QuicStreamManagerTest, TestClearActionable) { EXPECT_TRUE(manager.peekableStreams().empty()); } +TEST_P(QuicStreamManagerTest, TestUnidirectionalStreamsSeparateSet) { + conn.transportSettings.unidirectionalStreamsReadCallbacksFirst = true; + auto& manager = *conn.streamManager; + + StreamId id = 1; + auto stream = manager.createNextUnidirectionalStream().value(); + stream->readBuffer.emplace_back(folly::IOBuf::copyBuffer("blah blah"), 0); + manager.queueFlowControlUpdated(id); + manager.addDeliverable(id); + manager.updateReadableStreams(*stream); + manager.updatePeekableStreams(*stream); + + EXPECT_TRUE(manager.flowControlUpdatedContains(id)); + EXPECT_TRUE(manager.deliverableContains(id)); + EXPECT_TRUE(manager.readableStreams().empty()); + EXPECT_FALSE(manager.readableUnidirectionalStreams().empty()); + EXPECT_FALSE(manager.peekableStreams().empty()); + manager.clearActionable(); + EXPECT_FALSE(manager.flowControlUpdatedContains(id)); + EXPECT_FALSE(manager.deliverableContains(id)); + EXPECT_TRUE(manager.readableStreams().empty()); + EXPECT_TRUE(manager.peekableStreams().empty()); +} + +TEST_P( + QuicStreamManagerTest, + TestUnidirectionalStreamsSeparateSetRemoveStream) { + conn.transportSettings.unidirectionalStreamsReadCallbacksFirst = true; + auto& manager = *conn.streamManager; + + auto stream = manager.createNextUnidirectionalStream().value(); + stream->readBuffer.emplace_back(folly::IOBuf::copyBuffer("blah blah"), 0); + manager.updateReadableStreams(*stream); + manager.updatePeekableStreams(*stream); + + EXPECT_TRUE(manager.readableStreams().empty()); + EXPECT_FALSE(manager.readableUnidirectionalStreams().empty()); + EXPECT_FALSE(manager.peekableStreams().empty()); + + // Remove data from stream. + stream->readBuffer.clear(); + manager.updateReadableStreams(*stream); + + EXPECT_TRUE(manager.readableStreams().empty()); + EXPECT_TRUE(manager.readableUnidirectionalStreams().empty()); +} + +TEST_P(QuicStreamManagerTest, TestUnidirectionalStreamsSeparateSetTwoStreams) { + conn.transportSettings.unidirectionalStreamsReadCallbacksFirst = true; + auto& manager = *conn.streamManager; + + auto stream = manager.createNextBidirectionalStream().value(); + stream->readBuffer.emplace_back( + folly::IOBuf::copyBuffer("and i'm headers"), 0); + + manager.updateReadableStreams(*stream); + manager.updatePeekableStreams(*stream); + + EXPECT_EQ(manager.readableStreams().size(), 1); + EXPECT_EQ(manager.readableUnidirectionalStreams().size(), 0); + + auto stream2 = manager.createNextUnidirectionalStream().value(); + stream2->readBuffer.emplace_back( + folly::IOBuf::copyBuffer("look at me, i am qpack data"), 0); + + manager.updateReadableStreams(*stream2); + manager.updatePeekableStreams(*stream2); + + EXPECT_EQ(manager.readableStreams().size(), 1); + EXPECT_EQ(manager.readableUnidirectionalStreams().size(), 1); +} + TEST_P(QuicStreamManagerTest, WriteBufferMeta) { auto& manager = *conn.streamManager; auto stream = manager.createNextUnidirectionalStream().value();