diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index f2d832e1b..5678863bd 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -33,13 +33,16 @@ QuicTransportBase::QuicTransportBase( drainTimeout_(this), readLooper_(new FunctionLooper( evb, - [this](bool /* ignored */) { invokeReadDataAndCallbacks(); })), + [this](bool /* ignored */) { invokeReadDataAndCallbacks(); }, + LooperType::ReadLooper)), peekLooper_(new FunctionLooper( evb, - [this](bool /* ignored */) { invokePeekDataAndCallbacks(); })), - writeLooper_(new FunctionLooper(evb, [this](bool fromTimer) { - pacedWriteDataToSocket(fromTimer); - })) { + [this](bool /* ignored */) { invokePeekDataAndCallbacks(); }, + LooperType::PeekLooper)), + writeLooper_(new FunctionLooper( + evb, + [this](bool fromTimer) { pacedWriteDataToSocket(fromTimer); }, + LooperType::WriteLooper)) { writeLooper_->setPacingFunction([this]() -> auto { if (isConnectionPaced(*conn_)) { conn_->congestionController->markPacerTimeoutScheduled(Clock::now()); @@ -677,6 +680,7 @@ void QuicTransportBase::invokeReadDataAndCallbacks() { readCallbacks_.erase(streamId); // if there is an error on the stream - it's not readable anymore, so // we cannot peek into it as well. + VLOG(10) << "Erasing peek callback for stream=" << streamId; self->conn_->streamManager->peekableStreams().erase(streamId); peekCallbacks_.erase(streamId); VLOG(10) << "invoking read error callbacks on stream=" << streamId << " " @@ -732,20 +736,27 @@ folly::Expected QuicTransportBase::setPeekCallback( return folly::unit; } -void QuicTransportBase::setPeekCallbackInternal( +folly::Expected +QuicTransportBase::setPeekCallbackInternal( StreamId id, PeekCallback* cb) noexcept { VLOG(4) << "Setting setPeekCallback for stream=" << id << " cb=" << cb << " " << *this; auto peekCbIt = peekCallbacks_.find(id); if (peekCbIt == peekCallbacks_.end()) { + // Don't allow initial setting of a nullptr callback. if (!cb) { - return; + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); } peekCbIt = peekCallbacks_.emplace(id, PeekCallbackData(cb)).first; } + if (!cb) { + VLOG(10) << "Resetting the peek callback to nullptr " + << "stream=" << id << " peekCb=" << peekCbIt->second.peekCb; + } peekCbIt->second.peekCb = cb; updatePeekLooper(); + return folly::unit; } folly::Expected QuicTransportBase::pausePeek( @@ -792,13 +803,18 @@ void QuicTransportBase::invokePeekDataAndCallbacks() { // to 0 we can execute "consume" calls that were done during "peek", for that, // we would need to keep stack of them. auto peekableListCopy = self->conn_->streamManager->peekableStreams(); + VLOG(10) << __func__ + << " peekableListCopy.size()=" << peekableListCopy.size(); for (const auto& streamId : peekableListCopy) { auto callback = self->peekCallbacks_.find(streamId); + // This is a likely bug. Need to think more on whether events can + // be dropped // remove streamId from list of peekable - as opposed to "read", "peek" is // only called once per streamId and not on every EVB loop until application // reads the data. self->conn_->streamManager->peekableStreams().erase(streamId); if (callback == self->peekCallbacks_.end()) { + VLOG(10) << " No peek callback for stream=" << streamId; continue; } auto peekCb = callback->second.peekCb; @@ -815,6 +831,8 @@ void QuicTransportBase::invokePeekDataAndCallbacks() { [&](StreamId id, const folly::Range& peekRange) { peekCb->onDataAvailable(id, peekRange); }); + } else { + VLOG(10) << "Not invoking peek callbacks on stream=" << streamId; } } } @@ -1006,14 +1024,26 @@ void QuicTransportBase::updatePeekLooper() { peekLooper_->stop(); return; } + VLOG(10) << "Updating peek looper, has " + << conn_->streamManager->peekableStreams().size() + << " peekable streams"; auto iter = std::find_if( conn_->streamManager->peekableStreams().begin(), conn_->streamManager->peekableStreams().end(), [& peekCallbacks = peekCallbacks_](StreamId s) { + VLOG(10) << "Checking stream=" << s; auto peekCb = peekCallbacks.find(s); if (peekCb == peekCallbacks.end()) { + VLOG(10) << "No peek callbacks for stream=" << s; return false; } + if (!peekCb->second.resumed) { + VLOG(10) << "peek callback for stream=" << s << " not resumed"; + } + + if (!peekCb->second.peekCb) { + VLOG(10) << "no peekCb in peekCb stream=" << s; + } return peekCb->second.peekCb && peekCb->second.resumed; }); if (iter != conn_->streamManager->peekableStreams().end()) { @@ -1844,28 +1874,48 @@ void QuicTransportBase::checkForClosedStream() { } auto itr = conn_->streamManager->closedStreams().begin(); while (itr != conn_->streamManager->closedStreams().end()) { - auto callbackIt = readCallbacks_.find(*itr); + // We may be in an active read cb when we close the stream + auto readCbIt = readCallbacks_.find(*itr); + if (readCbIt != readCallbacks_.end() && + readCbIt->second.readCb != nullptr && !readCbIt->second.deliveredEOM) { + VLOG(10) << "Not closing stream=" << *itr + << " because it has active read callback"; + ++itr; + continue; + } + // We may be in the active peek cb when we close the stream + auto peekCbIt = peekCallbacks_.find(*itr); + if (peekCbIt != peekCallbacks_.end() && + peekCbIt->second.peekCb != nullptr) { + VLOG(10) << "Not closing stream=" << *itr + << " because it has active peek callback"; + ++itr; + continue; + } // We might be in the process of delivering all the delivery callbacks for // the stream when we receive close stream. auto deliveryCbCount = deliveryCallbacks_.count(*itr); - if (deliveryCbCount == 0 && - (callbackIt == readCallbacks_.end() || - callbackIt->second.readCb == nullptr || - callbackIt->second.deliveredEOM)) { - FOLLY_MAYBE_UNUSED auto stream = conn_->streamManager->findStream(*itr); - QUIC_TRACE( - holb_time, - *conn_, - stream->id, - stream->totalHolbTime.count(), - stream->holbCount); - conn_->streamManager->removeClosedStream(*itr); - readCallbacks_.erase(*itr); - itr = conn_->streamManager->closedStreams().erase(itr); - } else { + if (deliveryCbCount > 0) { + VLOG(10) << "Not closing stream=" << *itr + << " because it is waiting for the delivery callback"; ++itr; + continue; } - } + + VLOG(10) << "Closing stream=" << *itr; + FOLLY_MAYBE_UNUSED auto stream = conn_->streamManager->findStream(*itr); + QUIC_TRACE( + holb_time, + *conn_, + stream->id, + stream->totalHolbTime.count(), + stream->holbCount); + conn_->streamManager->removeClosedStream(*itr); + readCallbacks_.erase(*itr); + peekCallbacks_.erase(*itr); + itr = conn_->streamManager->closedStreams().erase(itr); + } // while + if (closeState_ == CloseState::GRACEFUL_CLOSING && conn_->streamManager->streamCount() == 0) { closeImpl(folly::none); @@ -2032,6 +2082,8 @@ void QuicTransportBase::cancelAllAppCallbacks( // structure of read callbacks. // TODO: this approach will make the app unable to setReadCallback to // nullptr during the loop. Need to fix that. + // TODO: setReadCallback to nullptr closes the stream, so the app + // may just do that... auto readCallbacksCopy = readCallbacks_; for (auto& cb : readCallbacksCopy) { readCallbacks_.erase(cb.first); @@ -2040,6 +2092,7 @@ void QuicTransportBase::cancelAllAppCallbacks( cb.first, std::make_pair(err.first, folly::StringPiece(err.second))); } } + VLOG(4) << "Clearing " << peekCallbacks_.size() << " peek callbacks"; peekCallbacks_.clear(); dataExpiredCallbacks_.clear(); dataRejectedCallbacks_.clear(); diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 7910c3c26..58fabaf06 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -465,7 +465,9 @@ class QuicTransportBase : public QuicSocket { folly::Expected setReadCallbackInternal( StreamId id, ReadCallback* cb) noexcept; - void setPeekCallbackInternal(StreamId id, PeekCallback* cb) noexcept; + folly::Expected setPeekCallbackInternal( + StreamId id, + PeekCallback* cb) noexcept; folly::Expected createStreamInternal( bool bidirectional); diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 8f4ddce7f..7eac924a6 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -155,7 +155,7 @@ class TestQuicTransport return lossTimeout_.getTimeRemaining(); } - void onReadData(const folly::SocketAddress&, NetworkData&& data) { + void onReadData(const folly::SocketAddress&, NetworkData&& data) override { if (!data.data) { return; } @@ -194,8 +194,8 @@ class TestQuicTransport continue; } appendDataToReadBuffer(*stream, std::move(buffer.second)); - conn_->streamManager->updatePeekableStreams(*stream); conn_->streamManager->updateReadableStreams(*stream); + conn_->streamManager->updatePeekableStreams(*stream); } } } @@ -276,6 +276,7 @@ class TestQuicTransport QuicStreamState* stream = conn_->streamManager->getStream(id); stream->streamReadError = ex; conn_->streamManager->updateReadableStreams(*stream); + conn_->streamManager->updatePeekableStreams(*stream); updateReadLooper(); } @@ -1847,17 +1848,18 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); transport->driveReadCallbacks(); - // Only read should be called. + // Only read should be called EXPECT_CALL(readCb, readAvailable(stream1)); - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); // Consume 5 bytes. transport->consume(stream1, 5); - // Only read should be called. + // Both peek and read should be called. + // Read - because it is called every time + // Peek - because the peekable range has changed EXPECT_CALL(readCb, readAvailable(stream1)); - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); transport->driveReadCallbacks(); // Read 10 bytes. @@ -1865,9 +1867,15 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { EXPECT_EQ("l stream d", data.first->moveToFbString().toStdString()); }); + // Both peek and read should be called. + // Read - because it is called every time + // Peek - because the peekable range has changed + EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); + transport->driveReadCallbacks(); + // Only read should be called. EXPECT_CALL(readCb, readAvailable(stream1)); - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); // Consume the rest of the data. @@ -1898,9 +1906,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { // Consume left part. transport->consume(stream1, buf1->computeChainDataLength()); - // Neither read nor peek should be called. - EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + // Only peek should be called. + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); transport->driveReadCallbacks(); // Fill in the gap. @@ -1983,21 +1990,18 @@ TEST_F(QuicTransportImplTest, UpdatePeekableListEmptyListTest) { TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) { auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; - auto stream = transport->getStream(streamId); - // Add some data to the stream. transport->addDataToStream( streamId, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); - transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR); - // streamId is in the list. EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId)); + transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR); + // streamId is removed from the list after the call // because there is an error on the stream. - conn->streamManager->updatePeekableStreams(*stream); EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId)); } diff --git a/quic/common/FunctionLooper.cpp b/quic/common/FunctionLooper.cpp index 9fc440c32..f8bb1fedb 100644 --- a/quic/common/FunctionLooper.cpp +++ b/quic/common/FunctionLooper.cpp @@ -14,8 +14,9 @@ using namespace std::chrono_literals; FunctionLooper::FunctionLooper( folly::EventBase* evb, - folly::Function&& func) - : evb_(evb), func_(std::move(func)) {} + folly::Function&& func, + LooperType type) + : evb_(evb), func_(std::move(func)), type_(type) {} void FunctionLooper::setPacingTimer( TimerHighRes::SharedPtr pacingTimer) noexcept { @@ -32,9 +33,12 @@ void FunctionLooper::commonLoopBody(bool fromTimer) noexcept { SCOPE_EXIT { inLoopBody_ = false; }; + auto hasBeenRunning = running_; func_(fromTimer); // callback could cause us to stop ourselves. // Someone could have also called run() in the callback. + VLOG(10) << __func__ << ": " << type_ << " fromTimer=" << fromTimer + << " hasBeenRunning=" << hasBeenRunning << " running_=" << running_; if (!running_) { return; } @@ -60,16 +64,24 @@ void FunctionLooper::runLoopCallback() noexcept { } void FunctionLooper::run(bool thisIteration) noexcept { + VLOG(10) << __func__ << ": " << type_; running_ = true; // Caller can call run() in func_. But if we are in pacing mode, we should // prevent such loop. - if (inLoopBody_ || isLoopCallbackScheduled() || isScheduled()) { + if (pacingTimer_ && inLoopBody_) { + VLOG(4) << __func__ << ": " << type_ + << " in loop body and using pacing - not rescheduling"; + return; + } + if (isLoopCallbackScheduled() || isScheduled()) { + VLOG(10) << __func__ << ": " << type_ << " already scheduled"; return; } evb_->runInLoop(this, thisIteration); } void FunctionLooper::stop() noexcept { + VLOG(10) << __func__ << ": " << type_; running_ = false; cancelLoopCallback(); cancelTimeout(); @@ -80,12 +92,14 @@ bool FunctionLooper::isRunning() const { } void FunctionLooper::attachEventBase(folly::EventBase* evb) { + VLOG(10) << __func__ << ": " << type_; DCHECK(!evb_); DCHECK(evb && evb->isInEventBaseThread()); evb_ = evb; } void FunctionLooper::detachEventBase() { + VLOG(10) << __func__ << ": " << type_; DCHECK(evb_ && evb_->isInEventBaseThread()); stop(); cancelTimeout(); @@ -108,4 +122,22 @@ FunctionLooper::getTimerTickInterval() noexcept { } return folly::none; } + +std::ostream& operator<<(std::ostream& out, const LooperType& rhs) { + switch (rhs) { + case LooperType::ReadLooper: + out << "ReadLooper"; + break; + case LooperType::PeekLooper: + out << "PeekLooper"; + break; + case LooperType::WriteLooper: + out << "WriteLooper"; + break; + default: + out << "unknown"; + break; + } + return out; +} } // namespace quic diff --git a/quic/common/FunctionLooper.h b/quic/common/FunctionLooper.h index 362ce9917..ad986a198 100644 --- a/quic/common/FunctionLooper.h +++ b/quic/common/FunctionLooper.h @@ -7,12 +7,21 @@ */ #pragma once +#include #include #include #include namespace quic { +enum class LooperType : uint8_t { + ReadLooper = 1, + PeekLooper = 2, + WriteLooper = 3 +}; + +std::ostream& operator<<(std::ostream& /* out */, const LooperType& /*rhs*/); + /** * A loop callback that provides convenience functions for calling a functions * in multiple evb loops. Calling run() will cause the loop to start and stop() @@ -27,7 +36,8 @@ class FunctionLooper : public folly::EventBase::LoopCallback, explicit FunctionLooper( folly::EventBase* evb, - folly::Function&& func); + folly::Function&& func, + LooperType type); void setPacingTimer(TimerHighRes::SharedPtr pacingTimer) noexcept; @@ -82,5 +92,6 @@ class FunctionLooper : public folly::EventBase::LoopCallback, TimerHighRes::SharedPtr pacingTimer_; bool running_{false}; bool inLoopBody_{false}; + const LooperType type_; }; } // namespace quic diff --git a/quic/common/test/FunctionLooperTest.cpp b/quic/common/test/FunctionLooperTest.cpp index 5511acde5..70daaeb8a 100644 --- a/quic/common/test/FunctionLooperTest.cpp +++ b/quic/common/test/FunctionLooperTest.cpp @@ -21,7 +21,8 @@ TEST(FunctionLooperTest, LooperNotRunning) { EventBase evb; bool called = false; auto func = [&](bool) { called = true; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); evb.loopOnce(); EXPECT_FALSE(called); evb.loopOnce(); @@ -33,7 +34,8 @@ TEST(FunctionLooperTest, LooperStarted) { EventBase evb; bool called = false; auto func = [&](bool) { called = true; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looper->run(); EXPECT_TRUE(looper->isRunning()); evb.loopOnce(); @@ -47,7 +49,8 @@ TEST(FunctionLooperTest, LooperStopped) { EventBase evb; bool called = false; auto func = [&](bool) { called = true; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looper->run(); evb.loopOnce(); EXPECT_TRUE(called); @@ -62,7 +65,8 @@ TEST(FunctionLooperTest, LooperRestarted) { EventBase evb; bool called = false; auto func = [&](bool) { called = true; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looper->run(); evb.loopOnce(); EXPECT_TRUE(called); @@ -85,7 +89,8 @@ TEST(FunctionLooperTest, DestroyLooperDuringFunc) { called = true; *looperPtr = nullptr; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looperPtr = &looper; looper->run(); @@ -103,7 +108,8 @@ TEST(FunctionLooperTest, StopLooperDuringFunc) { called = true; (*looperPtr)->stop(); }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looperPtr = &looper; looper->run(); @@ -123,7 +129,8 @@ TEST(FunctionLooperTest, RunLooperDuringFunc) { called = true; (*looperPtr)->run(); }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looperPtr = &looper; looper->run(); @@ -138,7 +145,8 @@ TEST(FunctionLooperTest, DetachStopsLooper) { EventBase evb; bool called = false; auto func = [&](bool) { called = true; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looper->run(); EXPECT_TRUE(looper->isRunning()); looper->detachEventBase(); @@ -160,7 +168,8 @@ TEST(FunctionLooperTest, PacingOnce) { } return std::chrono::milliseconds::zero(); }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looper->setPacingTimer(std::move(pacingTimer)); looper->setPacingFunction(std::move(pacingFunc)); looper->run(); @@ -186,7 +195,8 @@ TEST(FunctionLooperTest, KeepPacing) { } return 3600000ms; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(func))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(func), LooperType::ReadLooper)); looper->setPacingTimer(std::move(pacingTimer)); looper->setPacingFunction(std::move(pacingFunc)); looper->run(); @@ -223,7 +233,8 @@ TEST(FunctionLooperTest, KeepPacing) { TEST(FunctionLooperTest, TimerTickSize) { EventBase evb; TimerHighRes::SharedPtr pacingTimer(TimerHighRes::newTimer(&evb, 123ms)); - FunctionLooper::Ptr looper(new FunctionLooper(&evb, [&](bool) {})); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, [&](bool) {}, LooperType::ReadLooper)); looper->setPacingTimer(std::move(pacingTimer)); EXPECT_EQ(123ms, looper->getTimerTickInterval()); } @@ -231,7 +242,8 @@ TEST(FunctionLooperTest, TimerTickSize) { TEST(FunctionLooperTest, TimerTickSizeAfterNewEvb) { EventBase evb; TimerHighRes::SharedPtr pacingTimer(TimerHighRes::newTimer(&evb, 123ms)); - FunctionLooper::Ptr looper(new FunctionLooper(&evb, [&](bool) {})); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, [&](bool) {}, LooperType::ReadLooper)); looper->setPacingTimer(std::move(pacingTimer)); EXPECT_EQ(123ms, looper->getTimerTickInterval()); looper->detachEventBase(); @@ -252,7 +264,8 @@ TEST(FunctionLooperTest, NoLoopCallbackInPacingMode) { } }; auto pacingFunc = [&]() { return 3600000ms; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(runFunc))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(runFunc), LooperType::ReadLooper)); looper->setPacingTimer(std::move(pacingTimer)); looper->setPacingFunction(std::move(pacingFunc)); // bootstrap the looper @@ -278,7 +291,8 @@ TEST(FunctionLooperTest, RunConditions) { (*looperPtr)->run(); }; auto pacingFunc = [&]() { return 3600000ms; }; - FunctionLooper::Ptr looper(new FunctionLooper(&evb, std::move(runFunc))); + FunctionLooper::Ptr looper( + new FunctionLooper(&evb, std::move(runFunc), LooperType::ReadLooper)); looperPtr = &looper; looper->setPacingTimer(std::move(pacingTimer)); looper->setPacingFunction(std::move(pacingFunc)); diff --git a/quic/state/QPRFunctions.cpp b/quic/state/QPRFunctions.cpp index 1a28c8acc..6d389d4df 100644 --- a/quic/state/QPRFunctions.cpp +++ b/quic/state/QPRFunctions.cpp @@ -69,6 +69,7 @@ void shrinkReadBuffer(QuicStreamState* stream) { updateFlowControlOnRead(*stream, lastReadOffset, Clock::now()); // may become readable after shrink stream->conn.streamManager->updateReadableStreams(*stream); + stream->conn.streamManager->updatePeekableStreams(*stream); } } // namespace diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index 1596cdcef..5708e3d0d 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -272,6 +272,7 @@ std::pair readDataFromQuicStream( stream.currentReadOffset += 1; } stream.conn.streamManager->updateReadableStreams(stream); + stream.conn.streamManager->updatePeekableStreams(stream); return std::make_pair(nullptr, true); } @@ -288,6 +289,7 @@ std::pair readDataFromQuicStream( stream.currentReadOffset += 1; } stream.conn.streamManager->updateReadableStreams(stream); + stream.conn.streamManager->updatePeekableStreams(stream); return std::make_pair(std::move(data), eof); } @@ -315,6 +317,7 @@ void consumeDataFromQuicStream(QuicStreamState& stream, uint64_t amount) { stream.currentReadOffset++; } stream.conn.streamManager->updateReadableStreams(stream); + stream.conn.streamManager->updatePeekableStreams(stream); return; } @@ -330,6 +333,7 @@ void consumeDataFromQuicStream(QuicStreamState& stream, uint64_t amount) { stream.currentReadOffset += 1; } stream.conn.streamManager->updateReadableStreams(stream); + stream.conn.streamManager->updatePeekableStreams(stream); } bool allBytesTillFinAcked(const QuicStreamState& stream) { diff --git a/quic/state/stream/StreamOpenHandlers-inl.h b/quic/state/stream/StreamOpenHandlers-inl.h index 69a4e7587..d24c793a3 100644 --- a/quic/state/stream/StreamOpenHandlers-inl.h +++ b/quic/state/stream/StreamOpenHandlers-inl.h @@ -31,6 +31,7 @@ Handler:: } } stream.conn.streamManager->updateReadableStreams(stream); + stream.conn.streamManager->updatePeekableStreams(stream); } inline void diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index ef2faf7e7..36d40c465 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -1282,7 +1282,7 @@ TEST_F(QuicStreamFunctionsTest, UpdatesLastHolbTime) { } TEST_F(QuicStreamFunctionsTest, HolbTimingUpdateReadingListIdempotentWrtHolb) { - // test that calling uRL consequtevily (without new data or readsd) + // test that calling uRL in succession (without new data or readsd) // does not affect the HOLB state auto stream = conn.streamManager->createNextBidirectionalStream().value(); auto buf1 = IOBuf::copyBuffer("just");