1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-09 20:42:44 +03:00

Merge QUIC new data and loss data scheduler

Summary: they need to be prioritized together

Reviewed By: mjoras

Differential Revision: D26918282

fbshipit-source-id: 061a6135fd7d31280dc4897b00a17371044cee60
This commit is contained in:
Yang Chi
2021-03-10 09:35:18 -08:00
committed by Facebook GitHub Bot
parent 7f39885449
commit 6594defc7c
13 changed files with 196 additions and 197 deletions

View File

@@ -93,8 +93,6 @@ folly::StringPiece writeDataReasonString(WriteDataReason reason) {
return "Crypto"; return "Crypto";
case WriteDataReason::STREAM: case WriteDataReason::STREAM:
return "Stream"; return "Stream";
case WriteDataReason::LOSS:
return "Loss";
case WriteDataReason::BLOCKED: case WriteDataReason::BLOCKED:
return "Blocked"; return "Blocked";
case WriteDataReason::STREAM_WINDOW_UPDATE: case WriteDataReason::STREAM_WINDOW_UPDATE:

View File

@@ -537,7 +537,6 @@ enum class WriteDataReason {
ACK, ACK,
CRYPTO_STREAM, CRYPTO_STREAM,
STREAM, STREAM,
LOSS,
BLOCKED, BLOCKED,
STREAM_WINDOW_UPDATE, STREAM_WINDOW_UPDATE,
CONN_WINDOW_UPDATE, CONN_WINDOW_UPDATE,

View File

@@ -145,11 +145,6 @@ FrameScheduler::Builder::Builder(
packetNumberSpace_(packetNumberSpace), packetNumberSpace_(packetNumberSpace),
name_(std::move(name)) {} name_(std::move(name)) {}
FrameScheduler::Builder& FrameScheduler::Builder::streamRetransmissions() {
retransmissionScheduler_ = true;
return *this;
}
FrameScheduler::Builder& FrameScheduler::Builder::streamFrames() { FrameScheduler::Builder& FrameScheduler::Builder::streamFrames() {
streamFrameScheduler_ = true; streamFrameScheduler_ = true;
return *this; return *this;
@@ -192,9 +187,6 @@ FrameScheduler::Builder& FrameScheduler::Builder::pingFrames() {
FrameScheduler FrameScheduler::Builder::build() && { FrameScheduler FrameScheduler::Builder::build() && {
FrameScheduler scheduler(std::move(name_)); FrameScheduler scheduler(std::move(name_));
if (retransmissionScheduler_) {
scheduler.retransmissionScheduler_.emplace(RetransmissionScheduler(conn_));
}
if (streamFrameScheduler_) { if (streamFrameScheduler_) {
scheduler.streamFrameScheduler_.emplace(StreamFrameScheduler(conn_)); scheduler.streamFrameScheduler_.emplace(StreamFrameScheduler(conn_));
} }
@@ -279,9 +271,6 @@ SchedulingResult FrameScheduler::scheduleFramesForPacket(
if (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) { if (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) {
pingFrameScheduler_->writePing(wrapper); pingFrameScheduler_->writePing(wrapper);
} }
if (retransmissionScheduler_ && retransmissionScheduler_->hasPendingData()) {
retransmissionScheduler_->writeRetransmissionStreams(wrapper);
}
if (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) { if (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) {
streamFrameScheduler_->writeStreams(wrapper); streamFrameScheduler_->writeStreams(wrapper);
} }
@@ -308,8 +297,6 @@ bool FrameScheduler::hasData() const {
bool FrameScheduler::hasImmediateData() const { bool FrameScheduler::hasImmediateData() const {
return (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) || return (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) ||
(retransmissionScheduler_ &&
retransmissionScheduler_->hasPendingData()) ||
(streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) || (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) ||
(rstScheduler_ && rstScheduler_->hasPendingRsts()) || (rstScheduler_ && rstScheduler_->hasPendingRsts()) ||
(windowUpdateScheduler_ && (windowUpdateScheduler_ &&
@@ -324,24 +311,17 @@ folly::StringPiece FrameScheduler::name() const {
return name_; return name_;
} }
RetransmissionScheduler::RetransmissionScheduler( bool StreamFrameScheduler::writeStreamLossBuffers(
const QuicConnectionStateBase& conn)
: conn_(conn) {}
// Return true if this stream wrote some data
bool RetransmissionScheduler::writeStreamLossBuffers(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
StreamId id) { QuicStreamState& stream) {
auto stream = conn_.streamManager->findStream(id);
CHECK(stream);
bool wroteStreamFrame = false; bool wroteStreamFrame = false;
for (auto buffer = stream->lossBuffer.cbegin(); for (auto buffer = stream.lossBuffer.cbegin();
buffer != stream->lossBuffer.cend(); buffer != stream.lossBuffer.cend();
++buffer) { ++buffer) {
auto bufferLen = buffer->data.chainLength(); auto bufferLen = buffer->data.chainLength();
auto dataLen = writeStreamFrameHeader( auto dataLen = writeStreamFrameHeader(
builder, builder,
stream->id, stream.id,
buffer->offset, buffer->offset,
bufferLen, // writeBufferLen -- only the len of the single buffer. bufferLen, // writeBufferLen -- only the len of the single buffer.
bufferLen, // flowControlLen -- not relevant, already flow controlled. bufferLen, // flowControlLen -- not relevant, already flow controlled.
@@ -350,7 +330,7 @@ bool RetransmissionScheduler::writeStreamLossBuffers(
if (dataLen) { if (dataLen) {
wroteStreamFrame = true; wroteStreamFrame = true;
writeStreamFrameData(builder, buffer->data, *dataLen); writeStreamFrameData(builder, buffer->data, *dataLen);
VLOG(4) << "Wrote retransmitted stream=" << stream->id VLOG(4) << "Wrote loss data for stream=" << stream.id
<< " offset=" << buffer->offset << " bytes=" << *dataLen << " offset=" << buffer->offset << " bytes=" << *dataLen
<< " fin=" << (buffer->eof && *dataLen == bufferLen) << " " << " fin=" << (buffer->eof && *dataLen == bufferLen) << " "
<< conn_; << conn_;
@@ -362,48 +342,31 @@ bool RetransmissionScheduler::writeStreamLossBuffers(
return wroteStreamFrame; return wroteStreamFrame;
} }
void RetransmissionScheduler::writeRetransmissionStreams(
PacketBuilderInterface& builder) {
auto& lossStreams = conn_.streamManager->lossStreams();
for (size_t index = 0;
index < lossStreams.levels.size() && builder.remainingSpaceInPkt() > 0;
index++) {
auto& level = lossStreams.levels[index];
if (level.streams.empty()) {
// No data here, keep going
continue;
}
if (level.incremental) {
// Round robin the streams at this level
MiddleStartingIterationWrapper wrapper(level.streams, level.next);
auto writableStreamItr = wrapper.cbegin();
while (writableStreamItr != wrapper.cend()) {
if (writeStreamLossBuffers(builder, *writableStreamItr)) {
writableStreamItr++;
} else {
// We didn't write anything
break;
}
}
level.next = writableStreamItr.rawIterator();
} else {
// walk the sequential streams in order until we run out of space
for (auto stream : level.streams) {
if (!writeStreamLossBuffers(builder, stream)) {
break;
}
}
}
}
}
bool RetransmissionScheduler::hasPendingData() const {
return !conn_.streamManager->lossStreams().empty();
}
StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn) StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
: conn_(conn) {} : conn_(conn) {}
bool StreamFrameScheduler::writeSingleStream(
PacketBuilderInterface& builder,
QuicStreamState& stream,
bool& lossOnly,
uint64_t& connWritableBytes) {
if (!stream.lossBuffer.empty()) {
if (!writeStreamLossBuffers(builder, stream)) {
return false;
}
}
if (!lossOnly && stream.hasWritableData() && connWritableBytes > 0) {
auto streamWrite = writeStreamFrame(builder, stream, connWritableBytes);
if (streamWrite == WriteStreamFrameResult::PACKET_SPACE_ERR) {
return false;
} else if (streamWrite == WriteStreamFrameResult::FLOW_CONTROL_ERR) {
// In this case, we should continue with loss writes in other streams.
lossOnly = true;
}
}
return true;
}
StreamId StreamFrameScheduler::writeStreamsHelper( StreamId StreamFrameScheduler::writeStreamsHelper(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
const std::set<StreamId>& writableStreams, const std::set<StreamId>& writableStreams,
@@ -416,14 +379,15 @@ StreamId StreamFrameScheduler::writeStreamsHelper(
// stream id. The iterator will wrap around the collection at the end, and we // stream id. The iterator will wrap around the collection at the end, and we
// keep track of the value at the next iteration. This allows us to start // keep track of the value at the next iteration. This allows us to start
// writing at the next stream when building the next packet. // writing at the next stream when building the next packet.
// TODO experiment with writing streams with an actual prioritization scheme. bool lossOnly = false;
while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) { while (writableStreamItr != wrapper.cend()) {
if (writeNextStreamFrame(builder, *writableStreamItr, connWritableBytes)) { auto stream = conn_.streamManager->findStream(*writableStreamItr);
writableStreamItr++; CHECK(stream);
if (streamPerPacket) { if (!writeSingleStream(builder, *stream, lossOnly, connWritableBytes)) {
break; break;
} }
} else { writableStreamItr++;
if (streamPerPacket) {
break; break;
} }
} }
@@ -437,30 +401,29 @@ void StreamFrameScheduler::writeStreamsHelper(
bool streamPerPacket) { bool streamPerPacket) {
// Fill a packet with non-control stream data, in priority order // Fill a packet with non-control stream data, in priority order
for (size_t index = 0; index < writableStreams.levels.size() && for (size_t index = 0; index < writableStreams.levels.size() &&
builder.remainingSpaceInPkt() > 0 && connWritableBytes > 0; builder.remainingSpaceInPkt() > 0;
index++) { index++) {
PriorityQueue::Level& level = writableStreams.levels[index]; PriorityQueue::Level& level = writableStreams.levels[index];
if (level.streams.empty()) { if (level.streams.empty()) {
// No data here, keep going // No data here, keep going
continue; continue;
} }
bool lossOnly = false;
if (level.incremental) { if (level.incremental) {
// Round robin the streams at this level // Round robin the streams at this level
MiddleStartingIterationWrapper wrapper(level.streams, level.next); MiddleStartingIterationWrapper wrapper(level.streams, level.next);
auto writableStreamItr = wrapper.cbegin(); auto writableStreamItr = wrapper.cbegin();
while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) { while (writableStreamItr != wrapper.cend()) {
if (writeNextStreamFrame( auto stream = conn_.streamManager->findStream(*writableStreamItr);
builder, *writableStreamItr, connWritableBytes)) { CHECK(stream);
writableStreamItr++; if (!writeSingleStream(builder, *stream, lossOnly, connWritableBytes)) {
if (streamPerPacket) {
level.next = writableStreamItr.rawIterator();
return;
}
} else {
// Either we filled the packet, ran out of flow control,
// or ran out of data at this level
break; break;
} }
writableStreamItr++;
if (streamPerPacket) {
level.next = writableStreamItr.rawIterator();
return;
}
} }
level.next = writableStreamItr.rawIterator(); level.next = writableStreamItr.rawIterator();
} else { } else {
@@ -468,9 +431,12 @@ void StreamFrameScheduler::writeStreamsHelper(
for (auto streamIt = level.streams.begin(); for (auto streamIt = level.streams.begin();
streamIt != level.streams.end() && connWritableBytes > 0; streamIt != level.streams.end() && connWritableBytes > 0;
++streamIt) { ++streamIt) {
if (!writeNextStreamFrame(builder, *streamIt, connWritableBytes)) { auto stream = conn_.streamManager->findStream(*streamIt);
CHECK(stream);
if (!writeSingleStream(builder, *stream, lossOnly, connWritableBytes)) {
break; break;
} else if (streamPerPacket) { }
if (streamPerPacket) {
return; return;
} }
} }
@@ -513,44 +479,43 @@ bool StreamFrameScheduler::hasPendingData() const {
getSendConnFlowControlBytesWire(conn_) > 0; getSendConnFlowControlBytesWire(conn_) > 0;
} }
bool StreamFrameScheduler::writeNextStreamFrame( StreamFrameScheduler::WriteStreamFrameResult
StreamFrameScheduler::writeStreamFrame(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
StreamId streamId, QuicStreamState& stream,
uint64_t& connWritableBytes) { uint64_t& connWritableBytes) {
if (builder.remainingSpaceInPkt() == 0) { if (builder.remainingSpaceInPkt() == 0) {
return false; return WriteStreamFrameResult::PACKET_SPACE_ERR;
} }
auto stream = conn_.streamManager->findStream(streamId);
CHECK(stream);
// hasWritableData is the condition which has to be satisfied for the // hasWritableData is the condition which has to be satisfied for the
// stream to be in writableList // stream to be in writableList
DCHECK(stream->hasWritableData()); CHECK(stream.hasWritableData());
uint64_t flowControlLen = uint64_t flowControlLen =
std::min(getSendStreamFlowControlBytesWire(*stream), connWritableBytes); std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes);
uint64_t bufferLen = stream->writeBuffer.chainLength(); uint64_t bufferLen = stream.writeBuffer.chainLength();
bool canWriteFin = bool canWriteFin =
stream->finalWriteOffset.has_value() && bufferLen <= flowControlLen; stream.finalWriteOffset.has_value() && bufferLen <= flowControlLen;
auto dataLen = writeStreamFrameHeader( auto dataLen = writeStreamFrameHeader(
builder, builder,
stream->id, stream.id,
stream->currentWriteOffset, stream.currentWriteOffset,
bufferLen, bufferLen,
flowControlLen, flowControlLen,
canWriteFin, canWriteFin,
folly::none /* skipLenHint */); folly::none /* skipLenHint */);
if (!dataLen) { if (!dataLen) {
return false; return WriteStreamFrameResult::FLOW_CONTROL_ERR;
} }
writeStreamFrameData(builder, stream->writeBuffer, *dataLen); writeStreamFrameData(builder, stream.writeBuffer, *dataLen);
VLOG(4) << "Wrote stream frame stream=" << stream->id VLOG(4) << "Wrote stream frame stream=" << stream.id
<< " offset=" << stream->currentWriteOffset << " offset=" << stream.currentWriteOffset
<< " bytesWritten=" << *dataLen << " bytesWritten=" << *dataLen
<< " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " " << " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " "
<< conn_; << conn_;
connWritableBytes -= dataLen.value(); connWritableBytes -= dataLen.value();
return true; return WriteStreamFrameResult::SUCCESSFUL;
} }
AckScheduler::AckScheduler( AckScheduler::AckScheduler(

View File

@@ -59,20 +59,6 @@ class QuicPacketScheduler {
virtual folly::StringPiece name() const = 0; virtual folly::StringPiece name() const = 0;
}; };
class RetransmissionScheduler {
public:
explicit RetransmissionScheduler(const QuicConnectionStateBase& conn);
void writeRetransmissionStreams(PacketBuilderInterface& builder);
bool hasPendingData() const;
private:
bool writeStreamLossBuffers(PacketBuilderInterface& builder, StreamId id);
const QuicConnectionStateBase& conn_;
};
class StreamFrameScheduler { class StreamFrameScheduler {
public: public:
explicit StreamFrameScheduler(QuicConnectionStateBase& conn); explicit StreamFrameScheduler(QuicConnectionStateBase& conn);
@@ -86,6 +72,25 @@ class StreamFrameScheduler {
bool hasPendingData() const; bool hasPendingData() const;
private: private:
// Return true if this stream wrote some data
bool writeStreamLossBuffers(
PacketBuilderInterface& builder,
QuicStreamState& stream);
/**
* Write a single stream's write buffer or loss buffer
*
* lossOnly: if only loss buffer should be written. This param may get mutated
* inside the function.
*
* Return: true if write should continue after this stream, false otherwise.
*/
bool writeSingleStream(
PacketBuilderInterface& builder,
QuicStreamState& stream,
bool& lossOnly,
uint64_t& connWritableBytes);
StreamId writeStreamsHelper( StreamId writeStreamsHelper(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
const std::set<StreamId>& writableStreams, const std::set<StreamId>& writableStreams,
@@ -103,13 +108,18 @@ class StreamFrameScheduler {
* Helper function to write either stream data if stream is not flow * Helper function to write either stream data if stream is not flow
* controlled or a blocked frame otherwise. * controlled or a blocked frame otherwise.
* *
* Return: boolean indicates if anything (either data, or Blocked frame) is * Return: A WriteStreamFrameResult enum indicates if write is
* written into the packet. * successful, and whether the error is due to packet space or flow
* * control.
*/ */
bool writeNextStreamFrame( enum class WriteStreamFrameResult : uint8_t {
SUCCESSFUL,
PACKET_SPACE_ERR,
FLOW_CONTROL_ERR
};
WriteStreamFrameResult writeStreamFrame(
PacketBuilderInterface& builder, PacketBuilderInterface& builder,
StreamId streamId, QuicStreamState& stream,
uint64_t& connWritableBytes); uint64_t& connWritableBytes);
QuicConnectionStateBase& conn_; QuicConnectionStateBase& conn_;
@@ -237,7 +247,6 @@ class FrameScheduler : public QuicPacketScheduler {
PacketNumberSpace packetNumberSpace, PacketNumberSpace packetNumberSpace,
folly::StringPiece name); folly::StringPiece name);
Builder& streamRetransmissions();
Builder& streamFrames(); Builder& streamFrames();
Builder& ackFrames(); Builder& ackFrames();
Builder& resetFrames(); Builder& resetFrames();
@@ -256,7 +265,6 @@ class FrameScheduler : public QuicPacketScheduler {
folly::StringPiece name_; folly::StringPiece name_;
// schedulers // schedulers
bool retransmissionScheduler_{false};
bool streamFrameScheduler_{false}; bool streamFrameScheduler_{false};
bool ackScheduler_{false}; bool ackScheduler_{false};
bool rstScheduler_{false}; bool rstScheduler_{false};
@@ -282,7 +290,6 @@ class FrameScheduler : public QuicPacketScheduler {
FOLLY_NODISCARD folly::StringPiece name() const override; FOLLY_NODISCARD folly::StringPiece name() const override;
private: private:
folly::Optional<RetransmissionScheduler> retransmissionScheduler_;
folly::Optional<StreamFrameScheduler> streamFrameScheduler_; folly::Optional<StreamFrameScheduler> streamFrameScheduler_;
folly::Optional<AckScheduler> ackScheduler_; folly::Optional<AckScheduler> ackScheduler_;
folly::Optional<RstStreamScheduler> rstScheduler_; folly::Optional<RstStreamScheduler> rstScheduler_;

View File

@@ -121,7 +121,6 @@ uint64_t writeQuicDataToSocketImpl(
.simpleFrames() .simpleFrames()
.resetFrames() .resetFrames()
.streamFrames() .streamFrames()
.streamRetransmissions()
.pingFrames(); .pingFrames();
if (!exceptCryptoStream) { if (!exceptCryptoStream) {
probeSchedulerBuilder.cryptoFrames(); probeSchedulerBuilder.cryptoFrames();
@@ -152,7 +151,6 @@ uint64_t writeQuicDataToSocketImpl(
exceptCryptoStream ? "FrameSchedulerWithoutCrypto" : "FrameScheduler") exceptCryptoStream ? "FrameSchedulerWithoutCrypto" : "FrameScheduler")
.streamFrames() .streamFrames()
.ackFrames() .ackFrames()
.streamRetransmissions()
.resetFrames() .resetFrames()
.windowUpdateFrames() .windowUpdateFrames()
.blockedFrames() .blockedFrames()
@@ -591,9 +589,9 @@ void updateConnection(
updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len); updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len);
maybeWriteBlockAfterSocketWrite(*stream); maybeWriteBlockAfterSocketWrite(*stream);
maybeWriteDataBlockedAfterSocketWrite(conn); maybeWriteDataBlockedAfterSocketWrite(conn);
conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->addTx(writeStreamFrame.streamId); conn.streamManager->addTx(writeStreamFrame.streamId);
} }
conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->updateLossStreams(*stream); conn.streamManager->updateLossStreams(*stream);
break; break;
} }
@@ -1033,7 +1031,6 @@ uint64_t writeZeroRttDataToSocket(
LongHeader::typeToPacketNumberSpace(type), LongHeader::typeToPacketNumberSpace(type),
"ZeroRttScheduler") "ZeroRttScheduler")
.streamFrames() .streamFrames()
.streamRetransmissions()
.resetFrames() .resetFrames()
.windowUpdateFrames() .windowUpdateFrames()
.blockedFrames() .blockedFrames()
@@ -1536,9 +1533,6 @@ WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) {
if (conn.streamManager->hasBlocked()) { if (conn.streamManager->hasBlocked()) {
return WriteDataReason::BLOCKED; return WriteDataReason::BLOCKED;
} }
if (conn.streamManager->hasLoss()) {
return WriteDataReason::LOSS;
}
if (getSendConnFlowControlBytesWire(conn) != 0 && if (getSendConnFlowControlBytesWire(conn) != 0 &&
conn.streamManager->hasWritable()) { conn.streamManager->hasWritable()) {
return WriteDataReason::STREAM; return WriteDataReason::STREAM;

View File

@@ -1508,6 +1508,47 @@ TEST_F(
EXPECT_EQ(buf->length(), 0); EXPECT_EQ(buf->length(), 0);
} }
TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) {
QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build());
conn.streamManager->setMaxLocalBidirectionalStreams(10);
conn.flowControlState.peerAdvertisedMaxOffset = 100000;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000;
auto lowPriStreamId =
(*conn.streamManager->createNextBidirectionalStream())->id;
auto highPriStreamId =
(*conn.streamManager->createNextBidirectionalStream())->id;
auto lowPriStream = conn.streamManager->findStream(lowPriStreamId);
auto highPriStream = conn.streamManager->findStream(highPriStreamId);
lowPriStream->lossBuffer.push_back(
StreamBuffer(folly::IOBuf::copyBuffer("Onegin"), 0, false));
conn.streamManager->updateWritableStreams(*lowPriStream);
conn.streamManager->setStreamPriority(lowPriStream->id, 5, false);
auto inBuf = buildRandomInputData(conn.udpSendPacketLen * 10);
writeDataToQuicStream(*highPriStream, inBuf->clone(), false);
conn.streamManager->updateWritableStreams(*highPriStream);
conn.streamManager->setStreamPriority(highPriStream->id, 0, false);
StreamFrameScheduler scheduler(conn);
ShortHeader shortHeader(
ProtectionType::KeyPhaseZero,
getTestConnectionId(),
getNextPacketNum(conn, PacketNumberSpace::AppData));
RegularQuicPacketBuilder builder(
conn.udpSendPacketLen,
std::move(shortHeader),
conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0));
builder.encodePacketHeader();
scheduler.writeStreams(builder);
auto packet = std::move(builder).buildPacket().packet;
EXPECT_EQ(1, packet.frames.size());
auto& writeStreamFrame = *packet.frames[0].asWriteStreamFrame();
EXPECT_EQ(highPriStream->id, writeStreamFrame.streamId);
EXPECT_FALSE(lowPriStream->lossBuffer.empty());
}
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
QuicPacketSchedulerTests, QuicPacketSchedulerTests,
QuicPacketSchedulerTest, QuicPacketSchedulerTest,

View File

@@ -147,9 +147,8 @@ void dropPackets(QuicServerConnectionState& conn) {
}), }),
std::move(*itr->second)); std::move(*itr->second));
stream->retransmissionBuffer.erase(itr); stream->retransmissionBuffer.erase(itr);
if (conn.streamManager->lossStreams().count(streamFrame->streamId) == 0) { conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->addLoss(streamFrame->streamId); conn.streamManager->updateLossStreams(*stream);
}
} }
} }
conn.outstandings.packets.clear(); conn.outstandings.packets.clear();

View File

@@ -4628,11 +4628,10 @@ TEST_F(QuicClientTransportAfterStartTest, ResetClearsPendingLoss) {
RegularQuicWritePacket* forceLossPacket = RegularQuicWritePacket* forceLossPacket =
CHECK_NOTNULL(findPacketWithStream(client->getNonConstConn(), streamId)); CHECK_NOTNULL(findPacketWithStream(client->getNonConstConn(), streamId));
markPacketLoss(client->getNonConstConn(), *forceLossPacket, false); markPacketLoss(client->getNonConstConn(), *forceLossPacket, false);
auto& pendingLossStreams = client->getConn().streamManager->lossStreams(); ASSERT_TRUE(client->getConn().streamManager->hasLoss());
ASSERT_TRUE(pendingLossStreams.count(streamId) > 0);
client->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN); client->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN);
ASSERT_TRUE(pendingLossStreams.count(streamId) == 0); ASSERT_FALSE(client->getConn().streamManager->hasLoss());
} }
TEST_F(QuicClientTransportAfterStartTest, LossAfterResetStream) { TEST_F(QuicClientTransportAfterStartTest, LossAfterResetStream) {
@@ -4653,8 +4652,7 @@ TEST_F(QuicClientTransportAfterStartTest, LossAfterResetStream) {
auto stream = CHECK_NOTNULL( auto stream = CHECK_NOTNULL(
client->getNonConstConn().streamManager->getStream(streamId)); client->getNonConstConn().streamManager->getStream(streamId));
ASSERT_TRUE(stream->lossBuffer.empty()); ASSERT_TRUE(stream->lossBuffer.empty());
auto& pendingLossStreams = client->getConn().streamManager->lossStreams(); ASSERT_FALSE(client->getConn().streamManager->hasLoss());
ASSERT_TRUE(pendingLossStreams.count(streamId) == 0);
} }
TEST_F(QuicClientTransportAfterStartTest, SendResetAfterEom) { TEST_F(QuicClientTransportAfterStartTest, SendResetAfterEom) {

View File

@@ -113,7 +113,8 @@ void markPacketLoss(
} }
stream->insertIntoLossBuffer(std::move(bufferItr->second)); stream->insertIntoLossBuffer(std::move(bufferItr->second));
stream->retransmissionBuffer.erase(bufferItr); stream->retransmissionBuffer.erase(bufferItr);
conn.streamManager->updateLossStreams(*stream); conn.streamManager->updateWritableStreams(*stream);
conn.streamManager->addLoss(stream->id);
break; break;
} }
case QuicWriteFrame::Type::WriteCryptoFrame: { case QuicWriteFrame::Type::WriteCryptoFrame: {

View File

@@ -239,7 +239,6 @@ bool QuicStreamManager::setStreamPriority(
// If this stream is already in the writable or loss queus, update the // If this stream is already in the writable or loss queus, update the
// priority there. // priority there.
writableStreams_.updateIfExist(id, stream->priority); writableStreams_.updateIfExist(id, stream->priority);
lossStreams_.updateIfExist(id, stream->priority);
writableDSRStreams_.updateIfExist(id, stream->priority); writableDSRStreams_.updateIfExist(id, stream->priority);
return true; return true;
} }
@@ -442,11 +441,11 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
writableStreams_.erase(streamId); writableStreams_.erase(streamId);
writableDSRStreams_.erase(streamId); writableDSRStreams_.erase(streamId);
writableControlStreams_.erase(streamId); writableControlStreams_.erase(streamId);
removeLoss(streamId);
blockedStreams_.erase(streamId); blockedStreams_.erase(streamId);
deliverableStreams_.erase(streamId); deliverableStreams_.erase(streamId);
txStreams_.erase(streamId); txStreams_.erase(streamId);
windowUpdates_.erase(streamId); windowUpdates_.erase(streamId);
lossStreams_.erase(streamId);
stopSendingStreams_.erase(streamId); stopSendingStreams_.erase(streamId);
flowControlUpdated_.erase(streamId); flowControlUpdated_.erase(streamId);
if (it->second.isControl) { if (it->second.isControl) {
@@ -498,16 +497,6 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
updateAppIdleState(); updateAppIdleState();
} }
void QuicStreamManager::updateLossStreams(QuicStreamState& stream) {
if (stream.lossBuffer.empty()) {
// No-op if not present
lossStreams_.erase(stream.id);
} else {
// No-op if already inserted
lossStreams_.insertOrUpdate(stream.id, stream.priority);
}
}
void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
updateHolBlockedTime(stream); updateHolBlockedTime(stream);
if (stream.hasReadableData() || stream.streamReadError.has_value()) { if (stream.hasReadableData() || stream.streamReadError.has_value()) {
@@ -518,11 +507,23 @@ void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) {
} }
void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
if (stream.hasWritableDataOrBufMeta() && if (stream.streamWriteError.has_value()) {
!stream.streamWriteError.has_value()) { CHECK(stream.lossBuffer.empty());
stream.conn.streamManager->addWritable(stream); removeWritable(stream);
return;
}
if (stream.hasWritableData() || !stream.lossBuffer.empty()) {
addWritable(stream);
} else { } else {
stream.conn.streamManager->removeWritable(stream); removeWritable(stream);
}
if (stream.isControl) {
return;
}
if (stream.hasWritableBufMeta()) {
addDSRWritable(stream);
} else {
removeDSRWritable(stream);
} }
} }

View File

@@ -105,13 +105,6 @@ class QuicStreamManager {
*/ */
void updateWritableStreams(QuicStreamState& stream); void updateWritableStreams(QuicStreamState& stream);
/*
* Update the current loss streams for the given stream state. This will
* either add or remove it from the collection of streams with outstanding
* loss.
*/
void updateLossStreams(QuicStreamState& stream);
/* /*
* Find a open and active (we have created state for it) stream and return its * Find a open and active (we have created state for it) stream and return its
* state. * state.
@@ -195,22 +188,26 @@ class QuicStreamManager {
} }
} }
auto& lossStreams() const { FOLLY_NODISCARD bool hasLoss() const {
return lossStreams_;
}
// This should be only used in testing code.
void addLoss(StreamId streamId) {
auto stream = findStream(streamId);
if (stream) {
lossStreams_.insertOrUpdate(streamId, stream->priority);
}
}
bool hasLoss() const {
return !lossStreams_.empty(); return !lossStreams_.empty();
} }
void removeLoss(StreamId id) {
lossStreams_.erase(id);
}
void addLoss(StreamId id) {
lossStreams_.insert(id);
}
void updateLossStreams(const QuicStreamState& stream) {
if (stream.lossBuffer.empty()) {
removeLoss(stream.id);
} else {
addLoss(stream.id);
}
}
/** /**
* Update stream priority if the stream indicated by id exists, and the * Update stream priority if the stream indicated by id exists, and the
* passed in values are different from current priority. Return true if * passed in values are different from current priority. Return true if
@@ -255,18 +252,17 @@ class QuicStreamManager {
if (stream.isControl) { if (stream.isControl) {
writableControlStreams_.insert(stream.id); writableControlStreams_.insert(stream.id);
} else { } else {
bool hasPendingBufMeta = stream.hasWritableBufMeta(); CHECK(stream.hasWritableData() || !stream.lossBuffer.empty());
bool hasPendingWriteBuf = stream.hasWritableData(); writableStreams_.insertOrUpdate(stream.id, stream.priority);
CHECK(hasPendingBufMeta || hasPendingWriteBuf);
if (hasPendingBufMeta) {
writableDSRStreams_.insertOrUpdate(stream.id, stream.priority);
}
if (hasPendingWriteBuf) {
writableStreams_.insertOrUpdate(stream.id, stream.priority);
}
} }
} }
void addDSRWritable(const QuicStreamState& stream) {
CHECK(!stream.isControl);
CHECK(stream.hasWritableBufMeta());
writableDSRStreams_.insertOrUpdate(stream.id, stream.priority);
}
/* /*
* Remove a writable stream id. * Remove a writable stream id.
*/ */
@@ -275,10 +271,14 @@ class QuicStreamManager {
writableControlStreams_.erase(stream.id); writableControlStreams_.erase(stream.id);
} else { } else {
writableStreams_.erase(stream.id); writableStreams_.erase(stream.id);
writableDSRStreams_.erase(stream.id);
} }
} }
void removeDSRWritable(const QuicStreamState& stream) {
CHECK(!stream.isControl);
writableDSRStreams_.erase(stream.id);
}
/* /*
* Clear the writable streams. * Clear the writable streams.
*/ */
@@ -846,8 +846,8 @@ class QuicStreamManager {
// Streams that had their flow control updated // Streams that had their flow control updated
folly::F14FastSet<StreamId> flowControlUpdated_; folly::F14FastSet<StreamId> flowControlUpdated_;
// Data structure to keep track of stream that have detected lost data // Streams that have bytes in loss buffer
PriorityQueue lossStreams_; folly::F14FastSet<StreamId> lossStreams_;
// Set of streams that have pending reads // Set of streams that have pending reads
folly::F14FastSet<StreamId> readableStreams_; folly::F14FastSet<StreamId> readableStreams_;

View File

@@ -327,10 +327,6 @@ struct QuicStreamState : public QuicStreamLike {
return false; return false;
} }
FOLLY_NODISCARD bool hasWritableDataOrBufMeta() const {
return hasWritableData() || hasWritableBufMeta();
}
bool hasReadableData() const { bool hasReadableData() const {
return (readBuffer.size() > 0 && return (readBuffer.size() > 0 &&
currentReadOffset == readBuffer.front().offset) || currentReadOffset == readBuffer.front().offset) ||

View File

@@ -21,7 +21,7 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) {
stream.streamWriteError = error; stream.streamWriteError = error;
stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateReadableStreams(stream);
stream.conn.streamManager->updateWritableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream);
stream.conn.streamManager->updateLossStreams(stream); stream.conn.streamManager->removeLoss(stream.id);
} }
void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) { void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {