1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Fix peek callback

Summary:
Peek callback wasn't raised on data arrival. The fix is to:
* always update peek looper before read looper
* always update `peekableList` together with `readableList`

Differential Revision: D14757064

fbshipit-source-id: bb1e43762f54d024050eced425351f03a0cffd9f
This commit is contained in:
Konstantin Tsoy
2019-05-16 12:19:33 -07:00
committed by Facebook Github Bot
parent ab610d7138
commit ea0fb07be1
3 changed files with 143 additions and 19 deletions

View File

@@ -194,7 +194,6 @@ class TestQuicTransport
continue;
}
appendDataToReadBuffer(*stream, std::move(buffer.second));
conn_->streamManager->updatePeekableStreams(*stream);
conn_->streamManager->updateReadableStreams(*stream);
}
}
@@ -1633,6 +1632,123 @@ TEST_F(QuicTransportImplTest, PeekCallbackDataAvailable) {
transport.reset();
}
TEST_F(QuicTransportImplTest, PeekCallbackDataAvailableWithGap) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
MockPeekCallback peekCb;
MockReadCallback readCb;
transport->setPeekCallback(stream1, &peekCb);
transport->setReadCallback(stream1, &readCb);
transport->addDataToStream(
stream1,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
transport->driveReadCallbacks();
transport.reset();
}
TEST_F(QuicTransportImplTest, PeekSetReadCallback) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
MockPeekCallback peekCb;
MockReadCallback readCb;
transport->setPeekCallback(stream1, &peekCb);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _))
.WillOnce(Invoke([&](StreamId id, const folly::Range<PeekIterator>&) {
transport->setReadCallback(id, &readCb);
}));
transport->driveReadCallbacks();
// Should raise readAvailable on next evb loop pass.
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
transport.reset();
}
TEST_F(QuicTransportImplTest, PeekConsumeStreamPreface) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
MockPeekCallback peekCb;
MockReadCallback readCb;
transport->setPeekCallback(stream1, &peekCb);
auto streamPreface = folly::IOBuf::copyBuffer("[i'm a stream preface]");
const uint64_t streamPrefaceLen = streamPreface->computeChainDataLength();
transport->addDataToStream(stream1, StreamBuffer(streamPreface->clone(), 0));
transport->addDataToStream(
stream1,
StreamBuffer(
folly::IOBuf::copyBuffer("here come stream data"), streamPrefaceLen));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _))
.WillOnce(
Invoke([&](StreamId id, const folly::Range<PeekIterator>& range) {
EXPECT_EQ(id, stream1);
EXPECT_EQ(range.size(), 1);
auto bufClone = range[0].data.front()->clone();
EXPECT_EQ(
"[i'm a stream preface]here come stream data",
bufClone->moveToFbString().toStdString());
transport->consume(stream1, streamPrefaceLen);
transport->setPeekCallback(id, nullptr);
transport->setReadCallback(id, &readCb);
}));
transport->driveReadCallbacks();
// Should raise readAvailable on next evb loop pass.
EXPECT_CALL(readCb, readAvailable(stream1)).WillOnce(Invoke([&](StreamId id) {
EXPECT_EQ(id, stream1);
auto res = transport->read(id, 0);
EXPECT_FALSE(res.hasError());
auto& readBytes = res->first;
EXPECT_EQ(
"here come stream data", readBytes->moveToFbString().toStdString());
}));
transport->driveReadCallbacks();
transport.reset();
}
TEST_F(QuicTransportImplTest, PeekCallbackDataAvailableWithRead) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
MockReadCallback readCb1;
MockPeekCallback peekCb1;
transport->setReadCallback(stream1, &readCb1);
transport->setPeekCallback(stream1, &peekCb1);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
transport.reset();
}
TEST_F(QuicTransportImplTest, PeekCallbackUnsetAll) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
@@ -1831,7 +1947,6 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
auto readData = folly::IOBuf::copyBuffer("actual stream data");
MockPeekCallback peekCb;
MockReadCallback readCb;
@@ -1843,21 +1958,21 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
// Both peek and read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Only read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Consume 5 bytes.
transport->consume(stream1, 5);
// Only read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Read 10 bytes.
@@ -1866,8 +1981,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
});
// Only read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Consume the rest of the data.
@@ -1891,16 +2006,16 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
buf1->computeChainDataLength() + buf2->computeChainDataLength()));
// Both peek and read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Consume left part.
transport->consume(stream1, buf1->computeChainDataLength());
// Neither read nor peek should be called.
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
transport->driveReadCallbacks();
// Fill in the gap.
@@ -1908,8 +2023,8 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
stream1, StreamBuffer(buf2->clone(), buf1->computeChainDataLength()));
// Both peek and read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Read the rest of the buffer.
@@ -1920,13 +2035,27 @@ TEST_F(QuicTransportImplTest, PeekConsumeReadTest) {
});
// Neither read nor peek should be called.
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
transport->driveReadCallbacks();
transport.reset();
}
TEST_F(QuicTransportImplTest, UpdatePeekableListWithRead) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId));
transport->addDataToStream(
streamId,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->driveReadCallbacks();
EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
}
TEST_F(QuicTransportImplTest, UpdatePeekableListNoDataTest) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
@@ -1983,21 +2112,17 @@ TEST_F(QuicTransportImplTest, UpdatePeekableListEmptyListTest) {
TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
// Add some data to the stream.
transport->addDataToStream(
streamId,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
// streamId is in the list.
EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
// streamId is removed from the list after the call
// because there is an error on the stream.
conn->streamManager->updatePeekableStreams(*stream);
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId));
}