diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 3d1fe5205..5b23feb44 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1122,7 +1122,6 @@ folly::Expected, LocalErrorCode> QuicTransportBase::read( SCOPE_EXIT { checkForClosedStream(); updateReadLooper(); - updatePeekLooper(); // read can affect "peek" API updateWriteLooper(true); }; try { @@ -1237,7 +1236,6 @@ folly:: FOLLY_MAYBE_UNUSED auto self = sharedGuard(); SCOPE_EXIT { checkForClosedStream(); - updatePeekLooper(); updateReadLooper(); // consume may affect "read" API updateWriteLooper(true); }; @@ -1446,8 +1444,8 @@ void QuicTransportBase::onNetworkData( FOLLY_MAYBE_UNUSED auto self = sharedGuard(); SCOPE_EXIT { checkForClosedStream(); - updateReadLooper(); updatePeekLooper(); + updateReadLooper(); updateWriteLooper(true); }; try { diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 79feb06a9..f5107523c 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -194,7 +194,6 @@ class TestQuicTransport continue; } appendDataToReadBuffer(*stream, std::move(buffer.second)); - conn_->streamManager->updatePeekableStreams(*stream); conn_->streamManager->updateReadableStreams(*stream); } } @@ -1633,6 +1632,123 @@ TEST_F(QuicTransportImplTest, PeekCallbackDataAvailable) { transport.reset(); } +TEST_F(QuicTransportImplTest, PeekCallbackDataAvailableWithGap) { + InSequence enforceOrder; + + auto stream1 = transport->createBidirectionalStream().value(); + + MockPeekCallback peekCb; + MockReadCallback readCb; + + transport->setPeekCallback(stream1, &peekCb); + transport->setReadCallback(stream1, &readCb); + + transport->addDataToStream( + stream1, + StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10)); + + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); + EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); + transport->driveReadCallbacks(); + + transport.reset(); +} + +TEST_F(QuicTransportImplTest, PeekSetReadCallback) { + InSequence enforceOrder; + + auto stream1 = transport->createBidirectionalStream().value(); + + MockPeekCallback peekCb; + MockReadCallback readCb; + + transport->setPeekCallback(stream1, &peekCb); + + transport->addDataToStream( + stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); + + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)) + .WillOnce(Invoke([&](StreamId id, const folly::Range&) { + transport->setReadCallback(id, &readCb); + })); + transport->driveReadCallbacks(); + + // Should raise readAvailable on next evb loop pass. + EXPECT_CALL(readCb, readAvailable(stream1)); + transport->driveReadCallbacks(); + + transport.reset(); +} + +TEST_F(QuicTransportImplTest, PeekConsumeStreamPreface) { + InSequence enforceOrder; + + auto stream1 = transport->createBidirectionalStream().value(); + + MockPeekCallback peekCb; + MockReadCallback readCb; + + transport->setPeekCallback(stream1, &peekCb); + + auto streamPreface = folly::IOBuf::copyBuffer("[i'm a stream preface]"); + const uint64_t streamPrefaceLen = streamPreface->computeChainDataLength(); + transport->addDataToStream(stream1, StreamBuffer(streamPreface->clone(), 0)); + transport->addDataToStream( + stream1, + StreamBuffer( + folly::IOBuf::copyBuffer("here come stream data"), streamPrefaceLen)); + + EXPECT_CALL(peekCb, onDataAvailable(stream1, _)) + .WillOnce( + Invoke([&](StreamId id, const folly::Range& range) { + EXPECT_EQ(id, stream1); + EXPECT_EQ(range.size(), 1); + auto bufClone = range[0].data.front()->clone(); + EXPECT_EQ( + "[i'm a stream preface]here come stream data", + bufClone->moveToFbString().toStdString()); + + transport->consume(stream1, streamPrefaceLen); + transport->setPeekCallback(id, nullptr); + transport->setReadCallback(id, &readCb); + })); + transport->driveReadCallbacks(); + + // Should raise readAvailable on next evb loop pass. + EXPECT_CALL(readCb, readAvailable(stream1)).WillOnce(Invoke([&](StreamId id) { + EXPECT_EQ(id, stream1); + auto res = transport->read(id, 0); + EXPECT_FALSE(res.hasError()); + auto& readBytes = res->first; + EXPECT_EQ( + "here come stream data", readBytes->moveToFbString().toStdString()); + })); + transport->driveReadCallbacks(); + + transport.reset(); +} + +TEST_F(QuicTransportImplTest, PeekCallbackDataAvailableWithRead) { + InSequence enforceOrder; + + auto stream1 = transport->createBidirectionalStream().value(); + + MockReadCallback readCb1; + MockPeekCallback peekCb1; + + transport->setReadCallback(stream1, &readCb1); + transport->setPeekCallback(stream1, &peekCb1); + + transport->addDataToStream( + stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); + + EXPECT_CALL(peekCb1, onDataAvailable(stream1, _)); + EXPECT_CALL(readCb1, readAvailable(stream1)); + transport->driveReadCallbacks(); + + transport.reset(); +} + TEST_F(QuicTransportImplTest, PeekCallbackUnsetAll) { auto stream1 = transport->createBidirectionalStream().value(); auto stream2 = transport->createBidirectionalStream().value(); @@ -1831,7 +1947,6 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { InSequence enforceOrder; auto stream1 = transport->createBidirectionalStream().value(); - auto readData = folly::IOBuf::copyBuffer("actual stream data"); MockPeekCallback peekCb; MockReadCallback readCb; @@ -1843,21 +1958,21 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); // Both peek and read should be called. - EXPECT_CALL(readCb, readAvailable(stream1)); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); + EXPECT_CALL(readCb, readAvailable(stream1)); transport->driveReadCallbacks(); // Only read should be called. - EXPECT_CALL(readCb, readAvailable(stream1)); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + EXPECT_CALL(readCb, readAvailable(stream1)); transport->driveReadCallbacks(); // Consume 5 bytes. transport->consume(stream1, 5); // Only read should be called. - EXPECT_CALL(readCb, readAvailable(stream1)); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + EXPECT_CALL(readCb, readAvailable(stream1)); transport->driveReadCallbacks(); // Read 10 bytes. @@ -1866,8 +1981,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { }); // Only read should be called. - EXPECT_CALL(readCb, readAvailable(stream1)); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + EXPECT_CALL(readCb, readAvailable(stream1)); transport->driveReadCallbacks(); // Consume the rest of the data. @@ -1891,16 +2006,16 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { buf1->computeChainDataLength() + buf2->computeChainDataLength())); // Both peek and read should be called. - EXPECT_CALL(readCb, readAvailable(stream1)); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); + EXPECT_CALL(readCb, readAvailable(stream1)); transport->driveReadCallbacks(); // Consume left part. transport->consume(stream1, buf1->computeChainDataLength()); // Neither read nor peek should be called. - EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); transport->driveReadCallbacks(); // Fill in the gap. @@ -1908,8 +2023,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { stream1, StreamBuffer(buf2->clone(), buf1->computeChainDataLength())); // Both peek and read should be called. - EXPECT_CALL(readCb, readAvailable(stream1)); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)); + EXPECT_CALL(readCb, readAvailable(stream1)); transport->driveReadCallbacks(); // Read the rest of the buffer. @@ -1920,13 +2035,27 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) { }); // Neither read nor peek should be called. - EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0); + EXPECT_CALL(readCb, readAvailable(stream1)).Times(0); transport->driveReadCallbacks(); transport.reset(); } +TEST_F(QuicTransportImplTest, UpdatePeekableListWithRead) { + auto streamId = transport->createBidirectionalStream().value(); + const auto& conn = transport->transportConn; + + EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId)); + + transport->addDataToStream( + streamId, + StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); + transport->driveReadCallbacks(); + + EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId)); +} + TEST_F(QuicTransportImplTest, UpdatePeekableListNoDataTest) { auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; @@ -1983,21 +2112,17 @@ TEST_F(QuicTransportImplTest, UpdatePeekableListEmptyListTest) { TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) { auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; - auto stream = transport->getStream(streamId); // Add some data to the stream. transport->addDataToStream( streamId, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0)); - transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR); - // streamId is in the list. EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId)); - // streamId is removed from the list after the call - // because there is an error on the stream. - conn->streamManager->updatePeekableStreams(*stream); + transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR); + EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId)); } diff --git a/quic/state/QuicStreamManager.cpp b/quic/state/QuicStreamManager.cpp index 2756c955c..3c93c8763 100644 --- a/quic/state/QuicStreamManager.cpp +++ b/quic/state/QuicStreamManager.cpp @@ -358,6 +358,7 @@ void QuicStreamManager::updateLossStreams(QuicStreamState& stream) { void QuicStreamManager::updateReadableStreams(QuicStreamState& stream) { updateHolBlockedTime(stream); + updatePeekableStreams(stream); auto itr = readableStreams_.find(stream.id); if (!stream.hasReadableData() && !stream.streamReadError.hasValue()) { if (itr != readableStreams_.end()) {