diff --git a/quic/QuicConstants.cpp b/quic/QuicConstants.cpp index a61972487..6cec72c99 100644 --- a/quic/QuicConstants.cpp +++ b/quic/QuicConstants.cpp @@ -93,8 +93,6 @@ folly::StringPiece writeDataReasonString(WriteDataReason reason) { return "Crypto"; case WriteDataReason::STREAM: return "Stream"; - case WriteDataReason::LOSS: - return "Loss"; case WriteDataReason::BLOCKED: return "Blocked"; case WriteDataReason::STREAM_WINDOW_UPDATE: diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index db9fc8355..6c20dbb58 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -537,7 +537,6 @@ enum class WriteDataReason { ACK, CRYPTO_STREAM, STREAM, - LOSS, BLOCKED, STREAM_WINDOW_UPDATE, CONN_WINDOW_UPDATE, diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index f162f8d4e..29fa3a157 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -145,11 +145,6 @@ FrameScheduler::Builder::Builder( packetNumberSpace_(packetNumberSpace), name_(std::move(name)) {} -FrameScheduler::Builder& FrameScheduler::Builder::streamRetransmissions() { - retransmissionScheduler_ = true; - return *this; -} - FrameScheduler::Builder& FrameScheduler::Builder::streamFrames() { streamFrameScheduler_ = true; return *this; @@ -192,9 +187,6 @@ FrameScheduler::Builder& FrameScheduler::Builder::pingFrames() { FrameScheduler FrameScheduler::Builder::build() && { FrameScheduler scheduler(std::move(name_)); - if (retransmissionScheduler_) { - scheduler.retransmissionScheduler_.emplace(RetransmissionScheduler(conn_)); - } if (streamFrameScheduler_) { scheduler.streamFrameScheduler_.emplace(StreamFrameScheduler(conn_)); } @@ -279,9 +271,6 @@ SchedulingResult FrameScheduler::scheduleFramesForPacket( if (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) { pingFrameScheduler_->writePing(wrapper); } - if (retransmissionScheduler_ && retransmissionScheduler_->hasPendingData()) { - retransmissionScheduler_->writeRetransmissionStreams(wrapper); - } if (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) { streamFrameScheduler_->writeStreams(wrapper); } @@ -308,8 +297,6 @@ bool FrameScheduler::hasData() const { bool FrameScheduler::hasImmediateData() const { return (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) || - (retransmissionScheduler_ && - retransmissionScheduler_->hasPendingData()) || (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) || (rstScheduler_ && rstScheduler_->hasPendingRsts()) || (windowUpdateScheduler_ && @@ -324,24 +311,17 @@ folly::StringPiece FrameScheduler::name() const { return name_; } -RetransmissionScheduler::RetransmissionScheduler( - const QuicConnectionStateBase& conn) - : conn_(conn) {} - -// Return true if this stream wrote some data -bool RetransmissionScheduler::writeStreamLossBuffers( +bool StreamFrameScheduler::writeStreamLossBuffers( PacketBuilderInterface& builder, - StreamId id) { - auto stream = conn_.streamManager->findStream(id); - CHECK(stream); + QuicStreamState& stream) { bool wroteStreamFrame = false; - for (auto buffer = stream->lossBuffer.cbegin(); - buffer != stream->lossBuffer.cend(); + for (auto buffer = stream.lossBuffer.cbegin(); + buffer != stream.lossBuffer.cend(); ++buffer) { auto bufferLen = buffer->data.chainLength(); auto dataLen = writeStreamFrameHeader( builder, - stream->id, + stream.id, buffer->offset, bufferLen, // writeBufferLen -- only the len of the single buffer. bufferLen, // flowControlLen -- not relevant, already flow controlled. @@ -350,7 +330,7 @@ bool RetransmissionScheduler::writeStreamLossBuffers( if (dataLen) { wroteStreamFrame = true; writeStreamFrameData(builder, buffer->data, *dataLen); - VLOG(4) << "Wrote retransmitted stream=" << stream->id + VLOG(4) << "Wrote loss data for stream=" << stream.id << " offset=" << buffer->offset << " bytes=" << *dataLen << " fin=" << (buffer->eof && *dataLen == bufferLen) << " " << conn_; @@ -362,48 +342,31 @@ bool RetransmissionScheduler::writeStreamLossBuffers( return wroteStreamFrame; } -void RetransmissionScheduler::writeRetransmissionStreams( - PacketBuilderInterface& builder) { - auto& lossStreams = conn_.streamManager->lossStreams(); - for (size_t index = 0; - index < lossStreams.levels.size() && builder.remainingSpaceInPkt() > 0; - index++) { - auto& level = lossStreams.levels[index]; - if (level.streams.empty()) { - // No data here, keep going - continue; - } - if (level.incremental) { - // Round robin the streams at this level - MiddleStartingIterationWrapper wrapper(level.streams, level.next); - auto writableStreamItr = wrapper.cbegin(); - while (writableStreamItr != wrapper.cend()) { - if (writeStreamLossBuffers(builder, *writableStreamItr)) { - writableStreamItr++; - } else { - // We didn't write anything - break; - } - } - level.next = writableStreamItr.rawIterator(); - } else { - // walk the sequential streams in order until we run out of space - for (auto stream : level.streams) { - if (!writeStreamLossBuffers(builder, stream)) { - break; - } - } - } - } -} - -bool RetransmissionScheduler::hasPendingData() const { - return !conn_.streamManager->lossStreams().empty(); -} - StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn) : conn_(conn) {} +bool StreamFrameScheduler::writeSingleStream( + PacketBuilderInterface& builder, + QuicStreamState& stream, + bool& lossOnly, + uint64_t& connWritableBytes) { + if (!stream.lossBuffer.empty()) { + if (!writeStreamLossBuffers(builder, stream)) { + return false; + } + } + if (!lossOnly && stream.hasWritableData() && connWritableBytes > 0) { + auto streamWrite = writeStreamFrame(builder, stream, connWritableBytes); + if (streamWrite == WriteStreamFrameResult::PACKET_SPACE_ERR) { + return false; + } else if (streamWrite == WriteStreamFrameResult::FLOW_CONTROL_ERR) { + // In this case, we should continue with loss writes in other streams. + lossOnly = true; + } + } + return true; +} + StreamId StreamFrameScheduler::writeStreamsHelper( PacketBuilderInterface& builder, const std::set& writableStreams, @@ -416,14 +379,15 @@ StreamId StreamFrameScheduler::writeStreamsHelper( // stream id. The iterator will wrap around the collection at the end, and we // keep track of the value at the next iteration. This allows us to start // writing at the next stream when building the next packet. - // TODO experiment with writing streams with an actual prioritization scheme. - while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) { - if (writeNextStreamFrame(builder, *writableStreamItr, connWritableBytes)) { - writableStreamItr++; - if (streamPerPacket) { - break; - } - } else { + bool lossOnly = false; + while (writableStreamItr != wrapper.cend()) { + auto stream = conn_.streamManager->findStream(*writableStreamItr); + CHECK(stream); + if (!writeSingleStream(builder, *stream, lossOnly, connWritableBytes)) { + break; + } + writableStreamItr++; + if (streamPerPacket) { break; } } @@ -437,30 +401,29 @@ void StreamFrameScheduler::writeStreamsHelper( bool streamPerPacket) { // Fill a packet with non-control stream data, in priority order for (size_t index = 0; index < writableStreams.levels.size() && - builder.remainingSpaceInPkt() > 0 && connWritableBytes > 0; + builder.remainingSpaceInPkt() > 0; index++) { PriorityQueue::Level& level = writableStreams.levels[index]; if (level.streams.empty()) { // No data here, keep going continue; } + bool lossOnly = false; if (level.incremental) { // Round robin the streams at this level MiddleStartingIterationWrapper wrapper(level.streams, level.next); auto writableStreamItr = wrapper.cbegin(); - while (writableStreamItr != wrapper.cend() && connWritableBytes > 0) { - if (writeNextStreamFrame( - builder, *writableStreamItr, connWritableBytes)) { - writableStreamItr++; - if (streamPerPacket) { - level.next = writableStreamItr.rawIterator(); - return; - } - } else { - // Either we filled the packet, ran out of flow control, - // or ran out of data at this level + while (writableStreamItr != wrapper.cend()) { + auto stream = conn_.streamManager->findStream(*writableStreamItr); + CHECK(stream); + if (!writeSingleStream(builder, *stream, lossOnly, connWritableBytes)) { break; } + writableStreamItr++; + if (streamPerPacket) { + level.next = writableStreamItr.rawIterator(); + return; + } } level.next = writableStreamItr.rawIterator(); } else { @@ -468,9 +431,12 @@ void StreamFrameScheduler::writeStreamsHelper( for (auto streamIt = level.streams.begin(); streamIt != level.streams.end() && connWritableBytes > 0; ++streamIt) { - if (!writeNextStreamFrame(builder, *streamIt, connWritableBytes)) { + auto stream = conn_.streamManager->findStream(*streamIt); + CHECK(stream); + if (!writeSingleStream(builder, *stream, lossOnly, connWritableBytes)) { break; - } else if (streamPerPacket) { + } + if (streamPerPacket) { return; } } @@ -513,44 +479,43 @@ bool StreamFrameScheduler::hasPendingData() const { getSendConnFlowControlBytesWire(conn_) > 0; } -bool StreamFrameScheduler::writeNextStreamFrame( +StreamFrameScheduler::WriteStreamFrameResult +StreamFrameScheduler::writeStreamFrame( PacketBuilderInterface& builder, - StreamId streamId, + QuicStreamState& stream, uint64_t& connWritableBytes) { if (builder.remainingSpaceInPkt() == 0) { - return false; + return WriteStreamFrameResult::PACKET_SPACE_ERR; } - auto stream = conn_.streamManager->findStream(streamId); - CHECK(stream); // hasWritableData is the condition which has to be satisfied for the // stream to be in writableList - DCHECK(stream->hasWritableData()); + CHECK(stream.hasWritableData()); uint64_t flowControlLen = - std::min(getSendStreamFlowControlBytesWire(*stream), connWritableBytes); - uint64_t bufferLen = stream->writeBuffer.chainLength(); + std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes); + uint64_t bufferLen = stream.writeBuffer.chainLength(); bool canWriteFin = - stream->finalWriteOffset.has_value() && bufferLen <= flowControlLen; + stream.finalWriteOffset.has_value() && bufferLen <= flowControlLen; auto dataLen = writeStreamFrameHeader( builder, - stream->id, - stream->currentWriteOffset, + stream.id, + stream.currentWriteOffset, bufferLen, flowControlLen, canWriteFin, folly::none /* skipLenHint */); if (!dataLen) { - return false; + return WriteStreamFrameResult::FLOW_CONTROL_ERR; } - writeStreamFrameData(builder, stream->writeBuffer, *dataLen); - VLOG(4) << "Wrote stream frame stream=" << stream->id - << " offset=" << stream->currentWriteOffset + writeStreamFrameData(builder, stream.writeBuffer, *dataLen); + VLOG(4) << "Wrote stream frame stream=" << stream.id + << " offset=" << stream.currentWriteOffset << " bytesWritten=" << *dataLen << " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " " << conn_; connWritableBytes -= dataLen.value(); - return true; + return WriteStreamFrameResult::SUCCESSFUL; } AckScheduler::AckScheduler( diff --git a/quic/api/QuicPacketScheduler.h b/quic/api/QuicPacketScheduler.h index baed8c6d8..68688c799 100644 --- a/quic/api/QuicPacketScheduler.h +++ b/quic/api/QuicPacketScheduler.h @@ -59,20 +59,6 @@ class QuicPacketScheduler { virtual folly::StringPiece name() const = 0; }; -class RetransmissionScheduler { - public: - explicit RetransmissionScheduler(const QuicConnectionStateBase& conn); - - void writeRetransmissionStreams(PacketBuilderInterface& builder); - - bool hasPendingData() const; - - private: - bool writeStreamLossBuffers(PacketBuilderInterface& builder, StreamId id); - - const QuicConnectionStateBase& conn_; -}; - class StreamFrameScheduler { public: explicit StreamFrameScheduler(QuicConnectionStateBase& conn); @@ -86,6 +72,25 @@ class StreamFrameScheduler { bool hasPendingData() const; private: + // Return true if this stream wrote some data + bool writeStreamLossBuffers( + PacketBuilderInterface& builder, + QuicStreamState& stream); + + /** + * Write a single stream's write buffer or loss buffer + * + * lossOnly: if only loss buffer should be written. This param may get mutated + * inside the function. + * + * Return: true if write should continue after this stream, false otherwise. + */ + bool writeSingleStream( + PacketBuilderInterface& builder, + QuicStreamState& stream, + bool& lossOnly, + uint64_t& connWritableBytes); + StreamId writeStreamsHelper( PacketBuilderInterface& builder, const std::set& writableStreams, @@ -103,13 +108,18 @@ class StreamFrameScheduler { * Helper function to write either stream data if stream is not flow * controlled or a blocked frame otherwise. * - * Return: boolean indicates if anything (either data, or Blocked frame) is - * written into the packet. - * + * Return: A WriteStreamFrameResult enum indicates if write is + * successful, and whether the error is due to packet space or flow + * control. */ - bool writeNextStreamFrame( + enum class WriteStreamFrameResult : uint8_t { + SUCCESSFUL, + PACKET_SPACE_ERR, + FLOW_CONTROL_ERR + }; + WriteStreamFrameResult writeStreamFrame( PacketBuilderInterface& builder, - StreamId streamId, + QuicStreamState& stream, uint64_t& connWritableBytes); QuicConnectionStateBase& conn_; @@ -237,7 +247,6 @@ class FrameScheduler : public QuicPacketScheduler { PacketNumberSpace packetNumberSpace, folly::StringPiece name); - Builder& streamRetransmissions(); Builder& streamFrames(); Builder& ackFrames(); Builder& resetFrames(); @@ -256,7 +265,6 @@ class FrameScheduler : public QuicPacketScheduler { folly::StringPiece name_; // schedulers - bool retransmissionScheduler_{false}; bool streamFrameScheduler_{false}; bool ackScheduler_{false}; bool rstScheduler_{false}; @@ -282,7 +290,6 @@ class FrameScheduler : public QuicPacketScheduler { FOLLY_NODISCARD folly::StringPiece name() const override; private: - folly::Optional retransmissionScheduler_; folly::Optional streamFrameScheduler_; folly::Optional ackScheduler_; folly::Optional rstScheduler_; diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 68259f268..8dffe3369 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -121,7 +121,6 @@ uint64_t writeQuicDataToSocketImpl( .simpleFrames() .resetFrames() .streamFrames() - .streamRetransmissions() .pingFrames(); if (!exceptCryptoStream) { probeSchedulerBuilder.cryptoFrames(); @@ -152,7 +151,6 @@ uint64_t writeQuicDataToSocketImpl( exceptCryptoStream ? "FrameSchedulerWithoutCrypto" : "FrameScheduler") .streamFrames() .ackFrames() - .streamRetransmissions() .resetFrames() .windowUpdateFrames() .blockedFrames() @@ -591,9 +589,9 @@ void updateConnection( updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len); maybeWriteBlockAfterSocketWrite(*stream); maybeWriteDataBlockedAfterSocketWrite(conn); - conn.streamManager->updateWritableStreams(*stream); conn.streamManager->addTx(writeStreamFrame.streamId); } + conn.streamManager->updateWritableStreams(*stream); conn.streamManager->updateLossStreams(*stream); break; } @@ -1033,7 +1031,6 @@ uint64_t writeZeroRttDataToSocket( LongHeader::typeToPacketNumberSpace(type), "ZeroRttScheduler") .streamFrames() - .streamRetransmissions() .resetFrames() .windowUpdateFrames() .blockedFrames() @@ -1536,9 +1533,6 @@ WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn) { if (conn.streamManager->hasBlocked()) { return WriteDataReason::BLOCKED; } - if (conn.streamManager->hasLoss()) { - return WriteDataReason::LOSS; - } if (getSendConnFlowControlBytesWire(conn) != 0 && conn.streamManager->hasWritable()) { return WriteDataReason::STREAM; diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 552873744..4ff590b95 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -1508,6 +1508,47 @@ TEST_F( EXPECT_EQ(buf->length(), 0); } +TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) { + QuicServerConnectionState conn( + FizzServerQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote = 100000; + auto lowPriStreamId = + (*conn.streamManager->createNextBidirectionalStream())->id; + auto highPriStreamId = + (*conn.streamManager->createNextBidirectionalStream())->id; + auto lowPriStream = conn.streamManager->findStream(lowPriStreamId); + auto highPriStream = conn.streamManager->findStream(highPriStreamId); + + lowPriStream->lossBuffer.push_back( + StreamBuffer(folly::IOBuf::copyBuffer("Onegin"), 0, false)); + conn.streamManager->updateWritableStreams(*lowPriStream); + conn.streamManager->setStreamPriority(lowPriStream->id, 5, false); + + auto inBuf = buildRandomInputData(conn.udpSendPacketLen * 10); + writeDataToQuicStream(*highPriStream, inBuf->clone(), false); + conn.streamManager->updateWritableStreams(*highPriStream); + conn.streamManager->setStreamPriority(highPriStream->id, 0, false); + + StreamFrameScheduler scheduler(conn); + ShortHeader shortHeader( + ProtectionType::KeyPhaseZero, + getTestConnectionId(), + getNextPacketNum(conn, PacketNumberSpace::AppData)); + RegularQuicPacketBuilder builder( + conn.udpSendPacketLen, + std::move(shortHeader), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder.encodePacketHeader(); + scheduler.writeStreams(builder); + auto packet = std::move(builder).buildPacket().packet; + EXPECT_EQ(1, packet.frames.size()); + auto& writeStreamFrame = *packet.frames[0].asWriteStreamFrame(); + EXPECT_EQ(highPriStream->id, writeStreamFrame.streamId); + EXPECT_FALSE(lowPriStream->lossBuffer.empty()); +} + INSTANTIATE_TEST_CASE_P( QuicPacketSchedulerTests, QuicPacketSchedulerTest, diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index a76be1504..737286ae9 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -147,9 +147,8 @@ void dropPackets(QuicServerConnectionState& conn) { }), std::move(*itr->second)); stream->retransmissionBuffer.erase(itr); - if (conn.streamManager->lossStreams().count(streamFrame->streamId) == 0) { - conn.streamManager->addLoss(streamFrame->streamId); - } + conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->updateLossStreams(*stream); } } conn.outstandings.packets.clear(); diff --git a/quic/fizz/client/test/QuicClientTransportTest.cpp b/quic/fizz/client/test/QuicClientTransportTest.cpp index 8f09ee5ba..13d86f2c0 100644 --- a/quic/fizz/client/test/QuicClientTransportTest.cpp +++ b/quic/fizz/client/test/QuicClientTransportTest.cpp @@ -4628,11 +4628,10 @@ TEST_F(QuicClientTransportAfterStartTest, ResetClearsPendingLoss) { RegularQuicWritePacket* forceLossPacket = CHECK_NOTNULL(findPacketWithStream(client->getNonConstConn(), streamId)); markPacketLoss(client->getNonConstConn(), *forceLossPacket, false); - auto& pendingLossStreams = client->getConn().streamManager->lossStreams(); - ASSERT_TRUE(pendingLossStreams.count(streamId) > 0); + ASSERT_TRUE(client->getConn().streamManager->hasLoss()); client->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN); - ASSERT_TRUE(pendingLossStreams.count(streamId) == 0); + ASSERT_FALSE(client->getConn().streamManager->hasLoss()); } TEST_F(QuicClientTransportAfterStartTest, LossAfterResetStream) { @@ -4653,8 +4652,7 @@ TEST_F(QuicClientTransportAfterStartTest, LossAfterResetStream) { auto stream = CHECK_NOTNULL( client->getNonConstConn().streamManager->getStream(streamId)); ASSERT_TRUE(stream->lossBuffer.empty()); - auto& pendingLossStreams = client->getConn().streamManager->lossStreams(); - ASSERT_TRUE(pendingLossStreams.count(streamId) == 0); + ASSERT_FALSE(client->getConn().streamManager->hasLoss()); } TEST_F(QuicClientTransportAfterStartTest, SendResetAfterEom) { diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index 17dec9f97..0d5799ad9 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -113,7 +113,8 @@ void markPacketLoss( } stream->insertIntoLossBuffer(std::move(bufferItr->second)); stream->retransmissionBuffer.erase(bufferItr); - conn.streamManager->updateLossStreams(*stream); + conn.streamManager->updateWritableStreams(*stream); + conn.streamManager->addLoss(stream->id); break; } case QuicWriteFrame::Type::WriteCryptoFrame: { diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index d41f42f5b..4ce83eea3 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -239,7 +239,6 @@ bool QuicStreamManager::setStreamPriority( // If this stream is already in the writable or loss queus, update the // priority there. writableStreams_.updateIfExist(id, stream->priority); - lossStreams_.updateIfExist(id, stream->priority); writableDSRStreams_.updateIfExist(id, stream->priority); return true; } @@ -442,11 +441,11 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { writableStreams_.erase(streamId); writableDSRStreams_.erase(streamId); writableControlStreams_.erase(streamId); + removeLoss(streamId); blockedStreams_.erase(streamId); deliverableStreams_.erase(streamId); txStreams_.erase(streamId); windowUpdates_.erase(streamId); - lossStreams_.erase(streamId); stopSendingStreams_.erase(streamId); flowControlUpdated_.erase(streamId); if (it->second.isControl) { @@ -498,16 +497,6 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) { updateAppIdleState(); } -void QuicStreamManager::updateLossStreams(QuicStreamState& stream) { - if (stream.lossBuffer.empty()) { - // No-op if not present - lossStreams_.erase(stream.id); - } else { - // No-op if already inserted - lossStreams_.insertOrUpdate(stream.id, stream.priority); - } -} - void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { updateHolBlockedTime(stream); if (stream.hasReadableData() || stream.streamReadError.has_value()) { @@ -518,11 +507,23 @@ void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { } void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) { - if (stream.hasWritableDataOrBufMeta() && - !stream.streamWriteError.has_value()) { - stream.conn.streamManager->addWritable(stream); + if (stream.streamWriteError.has_value()) { + CHECK(stream.lossBuffer.empty()); + removeWritable(stream); + return; + } + if (stream.hasWritableData() || !stream.lossBuffer.empty()) { + addWritable(stream); } else { - stream.conn.streamManager->removeWritable(stream); + removeWritable(stream); + } + if (stream.isControl) { + return; + } + if (stream.hasWritableBufMeta()) { + addDSRWritable(stream); + } else { + removeDSRWritable(stream); } } diff --git a/quic/state/QuicStreamManager.h b/quic/state/QuicStreamManager.h index b25326342..0dbc14f6e 100644 --- a/quic/state/QuicStreamManager.h +++ b/quic/state/QuicStreamManager.h @@ -105,13 +105,6 @@ class QuicStreamManager { */ void updateWritableStreams(QuicStreamState& stream); - /* - * Update the current loss streams for the given stream state. This will - * either add or remove it from the collection of streams with outstanding - * loss. - */ - void updateLossStreams(QuicStreamState& stream); - /* * Find a open and active (we have created state for it) stream and return its * state. @@ -195,22 +188,26 @@ class QuicStreamManager { } } - auto& lossStreams() const { - return lossStreams_; - } - - // This should be only used in testing code. - void addLoss(StreamId streamId) { - auto stream = findStream(streamId); - if (stream) { - lossStreams_.insertOrUpdate(streamId, stream->priority); - } - } - - bool hasLoss() const { + FOLLY_NODISCARD bool hasLoss() const { return !lossStreams_.empty(); } + void removeLoss(StreamId id) { + lossStreams_.erase(id); + } + + void addLoss(StreamId id) { + lossStreams_.insert(id); + } + + void updateLossStreams(const QuicStreamState& stream) { + if (stream.lossBuffer.empty()) { + removeLoss(stream.id); + } else { + addLoss(stream.id); + } + } + /** * Update stream priority if the stream indicated by id exists, and the * passed in values are different from current priority. Return true if @@ -255,18 +252,17 @@ class QuicStreamManager { if (stream.isControl) { writableControlStreams_.insert(stream.id); } else { - bool hasPendingBufMeta = stream.hasWritableBufMeta(); - bool hasPendingWriteBuf = stream.hasWritableData(); - CHECK(hasPendingBufMeta || hasPendingWriteBuf); - if (hasPendingBufMeta) { - writableDSRStreams_.insertOrUpdate(stream.id, stream.priority); - } - if (hasPendingWriteBuf) { - writableStreams_.insertOrUpdate(stream.id, stream.priority); - } + CHECK(stream.hasWritableData() || !stream.lossBuffer.empty()); + writableStreams_.insertOrUpdate(stream.id, stream.priority); } } + void addDSRWritable(const QuicStreamState& stream) { + CHECK(!stream.isControl); + CHECK(stream.hasWritableBufMeta()); + writableDSRStreams_.insertOrUpdate(stream.id, stream.priority); + } + /* * Remove a writable stream id. */ @@ -275,10 +271,14 @@ class QuicStreamManager { writableControlStreams_.erase(stream.id); } else { writableStreams_.erase(stream.id); - writableDSRStreams_.erase(stream.id); } } + void removeDSRWritable(const QuicStreamState& stream) { + CHECK(!stream.isControl); + writableDSRStreams_.erase(stream.id); + } + /* * Clear the writable streams. */ @@ -846,8 +846,8 @@ class QuicStreamManager { // Streams that had their flow control updated folly::F14FastSet flowControlUpdated_; - // Data structure to keep track of stream that have detected lost data - PriorityQueue lossStreams_; + // Streams that have bytes in loss buffer + folly::F14FastSet lossStreams_; // Set of streams that have pending reads folly::F14FastSet readableStreams_; diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index b714121d2..87d43cd7d 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -327,10 +327,6 @@ struct QuicStreamState : public QuicStreamLike { return false; } - FOLLY_NODISCARD bool hasWritableDataOrBufMeta() const { - return hasWritableData() || hasWritableBufMeta(); - } - bool hasReadableData() const { return (readBuffer.size() > 0 && currentReadOffset == readBuffer.front().offset) || diff --git a/quic/state/stream/StreamStateFunctions.cpp b/quic/state/stream/StreamStateFunctions.cpp index 83d4093e8..cb102f35a 100644 --- a/quic/state/stream/StreamStateFunctions.cpp +++ b/quic/state/stream/StreamStateFunctions.cpp @@ -21,7 +21,7 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) { stream.streamWriteError = error; stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateWritableStreams(stream); - stream.conn.streamManager->updateLossStreams(stream); + stream.conn.streamManager->removeLoss(stream.id); } void onResetQuicStream(QuicStreamState& stream, const RstStreamFrame& frame) {