diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 5b23feb44..3d1fe5205 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1122,6 +1122,7 @@ folly::Expected, LocalErrorCode> QuicTransportBase::read( SCOPE_EXIT { checkForClosedStream(); updateReadLooper(); + updatePeekLooper(); // read can affect "peek" API updateWriteLooper(true); }; try { @@ -1236,6 +1237,7 @@ folly:: FOLLY_MAYBE_UNUSED auto self = sharedGuard(); SCOPE_EXIT { checkForClosedStream(); + updatePeekLooper(); updateReadLooper(); // consume may affect "read" API updateWriteLooper(true); }; @@ -1444,8 +1446,8 @@ void QuicTransportBase::onNetworkData( FOLLY_MAYBE_UNUSED auto self = sharedGuard(); SCOPE_EXIT { checkForClosedStream(); - updatePeekLooper(); updateReadLooper(); + updatePeekLooper(); updateWriteLooper(true); }; try { diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index f5107523c..79feb06a9 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -194,6 +194,7 @@ class TestQuicTransport continue; } appendDataToReadBuffer(*stream, std::move(buffer.second)); + conn_->streamManager->updatePeekableStreams(*stream); conn_->streamManager->updateReadableStreams(*stream); } } @@ -1632,123 +1633,6 @@ TEST_F(QuicTransportImplTest, PeekCallbackDataAvailable) { transport.reset(); } -TEST_F(QuicTransportImplTest, PeekCallbackDataAvailableWithGap) { - InSequence enforceOrder; - - auto stream1 = transport->createBidirectionalStream().value(); - - MockPeekCallback peekCb; - MockReadCallback readCb; - - transport->setPeekCallback(stream1, &peekCb); - transport->setReadCallback(stream1, &readCb); - - transport->addDataToStream( - stream1, - StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10)); - - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); - EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); - transport->driveReadCallbacks(); - - transport.reset(); -} - -TEST_F(QuicTransportImplTest, PeekSetReadCallback) { - InSequence enforceOrder; - - auto stream1 = transport->createBidirectionalStream().value(); - - MockPeekCallback peekCb; - MockReadCallback readCb; - - transport->setPeekCallback(stream1, &peekCb); - - transport->addDataToStream( - stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); - - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)) - .WillOnce(Invoke([&](StreamId id, const folly::Range&) { - transport->setReadCallback(id, &readCb); - })); - transport->driveReadCallbacks(); - - // Should raise readAvailable on next evb loop pass. - EXPECT_CALL(readCb, readAvailable(stream1)); - transport->driveReadCallbacks(); - - transport.reset(); -} - -TEST_F(QuicTransportImplTest, PeekConsumeStreamPreface) { - InSequence enforceOrder; - - auto stream1 = transport->createBidirectionalStream().value(); - - MockPeekCallback peekCb; - MockReadCallback readCb; - - transport->setPeekCallback(stream1, &peekCb); - - auto streamPreface = folly::IOBuf::copyBuffer("[i'm a stream preface]"); - const uint64_t streamPrefaceLen = streamPreface->computeChainDataLength(); - transport->addDataToStream(stream1, StreamBuffer(streamPreface->clone(), 0)); - transport->addDataToStream( - stream1, - StreamBuffer( - folly::IOBuf::copyBuffer("here come stream data"), streamPrefaceLen)); - - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)) - .WillOnce( - Invoke([&](StreamId id, const folly::Range& range) { - EXPECT_EQ(id, stream1); - EXPECT_EQ(range.size(), 1); - auto bufClone = range[0].data.front()->clone(); - EXPECT_EQ( - "[i'm a stream preface]here come stream data", - bufClone->moveToFbString().toStdString()); - - transport->consume(stream1, streamPrefaceLen); - transport->setPeekCallback(id, nullptr); - transport->setReadCallback(id, &readCb); - })); - transport->driveReadCallbacks(); - - // Should raise readAvailable on next evb loop pass. - EXPECT_CALL(readCb, readAvailable(stream1)).WillOnce(Invoke([&](StreamId id) { - EXPECT_EQ(id, stream1); - auto res = transport->read(id, 0); - EXPECT_FALSE(res.hasError()); - auto& readBytes = res->first; - EXPECT_EQ( - "here come stream data", readBytes->moveToFbString().toStdString()); - })); - transport->driveReadCallbacks(); - - transport.reset(); -} - -TEST_F(QuicTransportImplTest, PeekCallbackDataAvailableWithRead) { - InSequence enforceOrder; - - auto stream1 = transport->createBidirectionalStream().value(); - - MockReadCallback readCb1; - MockPeekCallback peekCb1; - - transport->setReadCallback(stream1, &readCb1); - transport->setPeekCallback(stream1, &peekCb1); - - transport->addDataToStream( - stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); - - EXPECT_CALL(peekCb1, onDataAvailable(stream1, _)); - EXPECT_CALL(readCb1, readAvailable(stream1)); - transport->driveReadCallbacks(); - - transport.reset(); -} - TEST_F(QuicTransportImplTest, PeekCallbackUnsetAll) { auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); @@ -1947,6 +1831,7 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { InSequence enforceOrder; auto stream1 = transport->createBidirectionalStream().value(); + auto readData = folly::IOBuf::copyBuffer("actual stream data"); MockPeekCallback peekCb; MockReadCallback readCb; @@ -1958,21 +1843,21 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); // Both peek and read should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); transport->driveReadCallbacks(); // Only read should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); // Consume 5 bytes. transport->consume(stream1, 5); // Only read should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); // Read 10 bytes. @@ -1981,8 +1866,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { }); // Only read should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); // Consume the rest of the data. @@ -2006,16 +1891,16 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { buf1->computeChainDataLength() + buf2->computeChainDataLength())); // Both peek and read should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); transport->driveReadCallbacks(); // Consume left part. transport->consume(stream1, buf1->computeChainDataLength()); // Neither read nor peek should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); // Fill in the gap. @@ -2023,8 +1908,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { stream1, StreamBuffer(buf2->clone(), buf1->computeChainDataLength())); // Both peek and read should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); EXPECT_CALL(readCb, readAvailable(stream1)); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); transport->driveReadCallbacks(); // Read the rest of the buffer. @@ -2035,27 +1920,13 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { }); // Neither read nor peek should be called. - EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); transport->driveReadCallbacks(); transport.reset(); } -TEST_F(QuicTransportImplTest, UpdatePeekableListWithRead) { - auto streamId = transport->createBidirectionalStream().value(); - const auto& conn = transport->transportConn; - - EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId)); - - transport->addDataToStream( - streamId, - StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); - transport->driveReadCallbacks(); - - EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId)); -} - TEST_F(QuicTransportImplTest, UpdatePeekableListNoDataTest) { auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; @@ -2112,17 +1983,21 @@ TEST_F(QuicTransportImplTest, UpdatePeekableListEmptyListTest) { TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) { auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; + auto stream = transport->getStream(streamId); // Add some data to the stream. transport->addDataToStream( streamId, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); + transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR); + // streamId is in the list. EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId)); - transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR); - + // streamId is removed from the list after the call + // because there is an error on the stream. + conn->streamManager->updatePeekableStreams(*stream); EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId)); } diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index 3c93c8763..2756c955c 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -358,7 +358,6 @@ void QuicStreamManager::updateLossStreams(QuicStreamState& stream) { void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { updateHolBlockedTime(stream); - updatePeekableStreams(stream); auto itr = readableStreams_.find(stream.id); if (!stream.hasReadableData() && !stream.streamReadError.hasValue()) { if (itr != readableStreams_.end()) {