1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Extra PriorityQueue in QuicStreamManager for DSR streams

Summary:
Real stream data and BufferMeta represented data will be scheduled by different
schedulers. For that reason, this diff adds another PriorityQueue into the stream
manager.

Reviewed By: mjoras

Differential Revision: D26132498

fbshipit-source-id: c69cb671c9a9f975d82efab8f1244a2f3c6c9297
This commit is contained in:
Yang Chi
2021-03-04 08:39:06 -08:00
committed by Facebook GitHub Bot
parent adc1e15eff
commit 89d3179ab8
7 changed files with 70 additions and 6 deletions

View File

@@ -2048,6 +2048,7 @@ TEST_F(QuicTransportFunctionsTest, HasAppDataToWrite) {
conn->flowControlState.peerAdvertisedMaxOffset = 1000; conn->flowControlState.peerAdvertisedMaxOffset = 1000;
conn->flowControlState.sumCurWriteOffset = 800; conn->flowControlState.sumCurWriteOffset = 800;
QuicStreamState stream(0, *conn); QuicStreamState stream(0, *conn);
writeDataToQuicStream(stream, folly::IOBuf::copyBuffer("I'm a devil"), true);
conn->streamManager->addWritable(stream); conn->streamManager->addWritable(stream);
EXPECT_EQ(WriteDataReason::NO_WRITE, hasNonAckDataToWrite(*conn)); EXPECT_EQ(WriteDataReason::NO_WRITE, hasNonAckDataToWrite(*conn));

View File

@@ -58,10 +58,16 @@ void writeBufMetaToQuicStream(
if (data.length > 0) { if (data.length > 0) {
maybeWriteBlockAfterAPIWrite(stream); maybeWriteBlockAfterAPIWrite(stream);
} }
auto realDataLength =
stream.currentWriteOffset + stream.writeBuffer.chainLength();
CHECK_GT(realDataLength, 0)
<< "Real data has to be written to a stream before any buffer meta is"
<< "written to it.";
if (stream.writeBufMeta.offset == 0) { if (stream.writeBufMeta.offset == 0) {
CHECK(!stream.finalWriteOffset.has_value()); CHECK(!stream.finalWriteOffset.has_value())
stream.writeBufMeta.offset = << "Buffer meta cannot be appended to a stream after we have seen EOM "
stream.currentWriteOffset + stream.writeBuffer.chainLength(); << "in real data";
stream.writeBufMeta.offset = realDataLength;
} }
stream.writeBufMeta.length += data.length; stream.writeBufMeta.length += data.length;
if (eof) { if (eof) {

View File

@@ -240,6 +240,7 @@ bool QuicStreamManager::setStreamPriority(
// priority there. // priority there.
writableStreams_.updateIfExist(id, stream->priority); writableStreams_.updateIfExist(id, stream->priority);
lossStreams_.updateIfExist(id, stream->priority); lossStreams_.updateIfExist(id, stream->priority);
writableDSRStreams_.updateIfExist(id, stream->priority);
return true; return true;
} }
return false; return false;
@@ -439,6 +440,7 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
readableStreams_.erase(streamId); readableStreams_.erase(streamId);
peekableStreams_.erase(streamId); peekableStreams_.erase(streamId);
writableStreams_.erase(streamId); writableStreams_.erase(streamId);
writableDSRStreams_.erase(streamId);
writableControlStreams_.erase(streamId); writableControlStreams_.erase(streamId);
blockedStreams_.erase(streamId); blockedStreams_.erase(streamId);
deliverableStreams_.erase(streamId); deliverableStreams_.erase(streamId);
@@ -516,7 +518,8 @@ void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
} }
void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
if (stream.hasWritableData() && !stream.streamWriteError.has_value()) { if (stream.hasWritableDataOrBufMeta() &&
!stream.streamWriteError.has_value()) {
stream.conn.streamManager->addWritable(stream); stream.conn.streamManager->addWritable(stream);
} else { } else {
stream.conn.streamManager->removeWritable(stream); stream.conn.streamManager->removeWritable(stream);

View File

@@ -227,6 +227,10 @@ class QuicStreamManager {
return writableStreams_; return writableStreams_;
} }
auto& writableDSRStreams() {
return writableDSRStreams_;
}
// TODO figure out a better interface here. // TODO figure out a better interface here.
/* /*
* Returns a mutable reference to the container holding the writable stream * Returns a mutable reference to the container holding the writable stream
@@ -240,7 +244,8 @@ class QuicStreamManager {
* Returns if there are any writable streams. * Returns if there are any writable streams.
*/ */
bool hasWritable() const { bool hasWritable() const {
return !writableStreams_.empty() || !writableControlStreams_.empty(); return !writableStreams_.empty() || !writableDSRStreams_.empty() ||
!writableControlStreams_.empty();
} }
/* /*
@@ -250,7 +255,15 @@ class QuicStreamManager {
if (stream.isControl) { if (stream.isControl) {
writableControlStreams_.insert(stream.id); writableControlStreams_.insert(stream.id);
} else { } else {
writableStreams_.insertOrUpdate(stream.id, stream.priority); bool hasPendingBufMeta = stream.hasWritableBufMeta();
bool hasPendingWriteBuf = stream.hasWritableData();
CHECK(hasPendingBufMeta || hasPendingWriteBuf);
if (hasPendingBufMeta) {
writableDSRStreams_.insertOrUpdate(stream.id, stream.priority);
}
if (hasPendingWriteBuf) {
writableStreams_.insertOrUpdate(stream.id, stream.priority);
}
} }
} }
@@ -262,6 +275,7 @@ class QuicStreamManager {
writableControlStreams_.erase(stream.id); writableControlStreams_.erase(stream.id);
} else { } else {
writableStreams_.erase(stream.id); writableStreams_.erase(stream.id);
writableDSRStreams_.erase(stream.id);
} }
} }
@@ -270,6 +284,7 @@ class QuicStreamManager {
*/ */
void clearWritable() { void clearWritable() {
writableStreams_.clear(); writableStreams_.clear();
writableDSRStreams_.clear();
writableControlStreams_.clear(); writableControlStreams_.clear();
} }
@@ -842,6 +857,7 @@ class QuicStreamManager {
// Set of !control streams that have writable data // Set of !control streams that have writable data
PriorityQueue writableStreams_; PriorityQueue writableStreams_;
PriorityQueue writableDSRStreams_;
// Set of control streams that have writable data // Set of control streams that have writable data
std::set<StreamId> writableControlStreams_; std::set<StreamId> writableControlStreams_;

View File

@@ -314,6 +314,23 @@ struct QuicStreamState : public QuicStreamLike {
return false; return false;
} }
FOLLY_NODISCARD bool hasWritableBufMeta() const {
if (writeBufMeta.offset == 0) {
return false;
}
if (writeBufMeta.length > 0) {
return flowControlState.peerAdvertisedMaxOffset - writeBufMeta.offset > 0;
}
if (finalWriteOffset) {
return writeBufMeta.offset <= *finalWriteOffset;
}
return false;
}
FOLLY_NODISCARD bool hasWritableDataOrBufMeta() const {
return hasWritableData() || hasWritableBufMeta();
}
bool hasReadableData() const { bool hasReadableData() const {
return (readBuffer.size() > 0 && return (readBuffer.size() > 0 &&
currentReadOffset == readBuffer.front().offset) || currentReadOffset == readBuffer.front().offset) ||

View File

@@ -1536,6 +1536,7 @@ TEST_F(QuicStreamFunctionsTest, RemovedClosedState) {
auto streamId = stream->id; auto streamId = stream->id;
conn.streamManager->readableStreams().emplace(streamId); conn.streamManager->readableStreams().emplace(streamId);
conn.streamManager->peekableStreams().emplace(streamId); conn.streamManager->peekableStreams().emplace(streamId);
writeDataToQuicStream(*stream, folly::IOBuf::copyBuffer("write data"), true);
conn.streamManager->addWritable(*stream); conn.streamManager->addWritable(*stream);
conn.streamManager->queueBlocked(streamId, 0); conn.streamManager->queueBlocked(streamId, 0);
conn.streamManager->addDeliverable(streamId); conn.streamManager->addDeliverable(streamId);

View File

@@ -318,5 +318,25 @@ TEST_F(QuicStreamManagerTest, TestClearActionable) {
EXPECT_TRUE(manager.peekableStreams().empty()); EXPECT_TRUE(manager.peekableStreams().empty());
} }
TEST_F(QuicStreamManagerTest, WriteBufferMeta) {
auto& manager = *conn.streamManager;
auto stream = manager.createNextUnidirectionalStream().value();
// Add some real data into write buffer
writeDataToQuicStream(*stream, folly::IOBuf::copyBuffer("prefix"), false);
// Artificially remove the stream from writable queue, so that any further
// writable query is about the DSR state.
manager.removeWritable(*stream);
BufferMeta bufferMeta(200);
writeBufMetaToQuicStream(*stream, bufferMeta, true);
EXPECT_TRUE(stream->hasWritableBufMeta());
EXPECT_TRUE(manager.hasWritable());
stream->sendState = StreamSendState::Closed;
stream->recvState = StreamRecvState::Closed;
manager.removeClosedStream(stream->id);
EXPECT_TRUE(manager.writableDSRStreams().empty());
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic