mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
Add peekError into PeekCallback
Summary: This diff adds error callback API during the time PeekCallback() is applied, following D9724616, D988728, D15312250, design doc https://fb.quip.com/8P9WAaF43VOn The peekError() API implementation is added into unidirectional stream dispatcher, class HQUnidirStreamDispatcher. Reviewed By: mjoras Differential Revision: D27514260 fbshipit-source-id: b65cc0e46a5d520f46f9dc218c9a65be99e6df55
This commit is contained in:
committed by
Facebook GitHub Bot
parent
5e272188ff
commit
3ea632ee61
@@ -614,7 +614,17 @@ class QuicSocket {
|
|||||||
virtual void onDataAvailable(
|
virtual void onDataAvailable(
|
||||||
StreamId id,
|
StreamId id,
|
||||||
const folly::Range<PeekIterator>& peekData) noexcept = 0;
|
const folly::Range<PeekIterator>& peekData) noexcept = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called from the transport layer during peek time when there is an error
|
||||||
|
* on the stream.
|
||||||
|
*/
|
||||||
|
virtual void peekError(
|
||||||
|
StreamId id,
|
||||||
|
std::pair<QuicErrorCode, folly::Optional<folly::StringPiece>>
|
||||||
|
error) noexcept = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
virtual folly::Expected<folly::Unit, LocalErrorCode> setPeekCallback(
|
virtual folly::Expected<folly::Unit, LocalErrorCode> setPeekCallback(
|
||||||
StreamId id,
|
StreamId id,
|
||||||
PeekCallback* cb) = 0;
|
PeekCallback* cb) = 0;
|
||||||
|
@@ -989,7 +989,13 @@ void QuicTransportBase::invokePeekDataAndCallbacks() {
|
|||||||
}
|
}
|
||||||
auto peekCb = callback->second.peekCb;
|
auto peekCb = callback->second.peekCb;
|
||||||
auto stream = conn_->streamManager->getStream(streamId);
|
auto stream = conn_->streamManager->getStream(streamId);
|
||||||
if (peekCb && !stream->streamReadError && stream->hasPeekableData()) {
|
if (peekCb && stream->streamReadError) {
|
||||||
|
VLOG(10) << "invoking peek error callbacks on stream=" << streamId << " "
|
||||||
|
<< *this;
|
||||||
|
peekCb->peekError(
|
||||||
|
streamId, std::make_pair(*stream->streamReadError, folly::none));
|
||||||
|
} else if (
|
||||||
|
peekCb && !stream->streamReadError && stream->hasPeekableData()) {
|
||||||
VLOG(10) << "invoking peek callbacks on stream=" << streamId << " "
|
VLOG(10) << "invoking peek callbacks on stream=" << streamId << " "
|
||||||
<< *this;
|
<< *this;
|
||||||
|
|
||||||
|
@@ -63,6 +63,14 @@ class MockPeekCallback : public QuicSocket::PeekCallback {
|
|||||||
,
|
,
|
||||||
onDataAvailable,
|
onDataAvailable,
|
||||||
void(StreamId, const folly::Range<PeekIterator>&));
|
void(StreamId, const folly::Range<PeekIterator>&));
|
||||||
|
GMOCK_METHOD2_(
|
||||||
|
,
|
||||||
|
noexcept,
|
||||||
|
,
|
||||||
|
peekError,
|
||||||
|
void(
|
||||||
|
StreamId,
|
||||||
|
std::pair<QuicErrorCode, folly::Optional<folly::StringPiece>>));
|
||||||
};
|
};
|
||||||
|
|
||||||
class MockWriteCallback : public QuicSocket::WriteCallback {
|
class MockWriteCallback : public QuicSocket::WriteCallback {
|
||||||
|
@@ -339,6 +339,8 @@ class TestQuicTransport
|
|||||||
stream->streamReadError = ex;
|
stream->streamReadError = ex;
|
||||||
conn_->streamManager->updateReadableStreams(*stream);
|
conn_->streamManager->updateReadableStreams(*stream);
|
||||||
conn_->streamManager->updatePeekableStreams(*stream);
|
conn_->streamManager->updatePeekableStreams(*stream);
|
||||||
|
// peekableStreams is updated to contain streams with streamReadError
|
||||||
|
updatePeekLooper();
|
||||||
updateReadLooper();
|
updateReadLooper();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2691,6 +2693,24 @@ TEST_F(QuicTransportImplTest, PeekCallbackDataAvailable) {
|
|||||||
transport.reset();
|
transport.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(QuicTransportImplTest, PeekError) {
|
||||||
|
auto stream1 = transport->createBidirectionalStream().value();
|
||||||
|
|
||||||
|
NiceMock<MockPeekCallback> peekCb1;
|
||||||
|
transport->setPeekCallback(stream1, &peekCb1);
|
||||||
|
|
||||||
|
transport->addDataToStream(
|
||||||
|
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
|
||||||
|
transport->addStreamReadError(stream1, LocalErrorCode::STREAM_CLOSED);
|
||||||
|
|
||||||
|
EXPECT_CALL(
|
||||||
|
peekCb1, peekError(stream1, IsError(LocalErrorCode::STREAM_CLOSED)));
|
||||||
|
|
||||||
|
transport->driveReadCallbacks();
|
||||||
|
|
||||||
|
transport.reset();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(QuicTransportImplTest, PeekCallbackUnsetAll) {
|
TEST_F(QuicTransportImplTest, PeekCallbackUnsetAll) {
|
||||||
auto stream1 = transport->createBidirectionalStream().value();
|
auto stream1 = transport->createBidirectionalStream().value();
|
||||||
auto stream2 = transport->createBidirectionalStream().value();
|
auto stream2 = transport->createBidirectionalStream().value();
|
||||||
@@ -3057,9 +3077,9 @@ TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) {
|
|||||||
|
|
||||||
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
|
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
|
||||||
|
|
||||||
// streamId is removed from the list after the call
|
// peekableStreams is updated to allow stream with streamReadError.
|
||||||
// because there is an error on the stream.
|
// So the streamId shall be in the list
|
||||||
EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId));
|
EXPECT_EQ(1, conn->streamManager->peekableStreams().count(streamId));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(QuicTransportImplTest, SuccessfulPing) {
|
TEST_F(QuicTransportImplTest, SuccessfulPing) {
|
||||||
|
@@ -528,7 +528,9 @@ void QuicStreamManager::updateWritableStreams(QuicStreamState& stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void QuicStreamManager::updatePeekableStreams(QuicStreamState& stream) {
|
void QuicStreamManager::updatePeekableStreams(QuicStreamState& stream) {
|
||||||
if (stream.hasPeekableData() && !stream.streamReadError.has_value()) {
|
// In the PeekCallback, the API peekError() is added, so change the condition
|
||||||
|
// and allow streamReadError in the peekableStreams
|
||||||
|
if (stream.hasPeekableData() || stream.streamReadError.has_value()) {
|
||||||
peekableStreams_.emplace(stream.id);
|
peekableStreams_.emplace(stream.id);
|
||||||
} else {
|
} else {
|
||||||
peekableStreams_.erase(stream.id);
|
peekableStreams_.erase(stream.id);
|
||||||
|
Reference in New Issue
Block a user