1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

StopSending Closes Ingress

Summary: - transport settings to enable the following behaviour: invoking stopSending() now drops buffered ingress data and closes the ingress path

Reviewed By: mjoras

Differential Revision: D40678864

fbshipit-source-id: 7dd5c7843909d5ac8cc309292b31d1d87b97a2f5
This commit is contained in:
Hani Damlaj
2022-11-14 18:06:47 -08:00
committed by Facebook GitHub Bot
parent 9b3909466b
commit c22ee0a8ab
6 changed files with 137 additions and 4 deletions

View File

@@ -20,6 +20,7 @@
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/stream/StreamReceiveHandlers.h>
#include <quic/state/stream/StreamSendHandlers.h>
#include <quic/state/test/Mocks.h>
#include <folly/io/async/test/MockAsyncUDPSocket.h>
@@ -384,6 +385,10 @@ class TestQuicTransport
return writeLooper_;
}
auto& readLooper() {
return readLooper_;
}
void unbindConnection() {}
void onReadError(const folly::AsyncSocketException&) noexcept {}
@@ -668,6 +673,116 @@ TEST_P(QuicTransportImplTestBase, IdleTimeoutStreamMaessage) {
}
TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) {
// update transport settings
auto transportSettings = transport->getTransportSettings();
transportSettings.dropIngressOnStopSending = true;
transport->setTransportSettings(transportSettings);
auto& streamManager = *transport->transportConn->streamManager;
auto unknownErrorCode = GenericApplicationErrorCode::UNKNOWN;
std::string ingressData = "some ingress stream data";
auto ingressDataLen = ingressData.size();
StreamId streamID;
QuicStreamState* stream;
// create bidirectional stream
streamID = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
transport->setReadCallback(streamID, &readCb1);
// add ingress & egress data to stream
transport->addDataToStream(
streamID, StreamBuffer(folly::IOBuf::copyBuffer(ingressData), 0));
transport->writeChain(
streamID, folly::IOBuf::copyBuffer("some egress stream data"), false);
transport->driveReadCallbacks();
stream = CHECK_NOTNULL(transport->getStream(streamID));
// check stream has readable data and SM is open
EXPECT_TRUE(stream->hasReadableData());
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
// send stop sending to peer this and later invoking reset stream should not
// invoke ReadCallback::readError()
EXPECT_CALL(readCb1, readError(streamID, _)).Times(0);
transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR);
// check that we've discarded any ingress data and ingress SM is closed
EXPECT_FALSE(stream->hasReadableData());
EXPECT_FALSE(streamManager.readableStreams().contains(streamID));
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Closed);
// suppose we tx a rst stream (and rx its corresponding ack), expect
// terminal state and queued in closed streams
transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR);
sendRstAckSMHandler(*stream);
EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
transport->driveReadCallbacks();
// now if we rx a rst_stream we should deliver ReadCallback::readError()
EXPECT_TRUE(streamManager.streamExists(streamID));
EXPECT_CALL(readCb1, readError(streamID, QuicError(unknownErrorCode)))
.Times(1);
receiveRstStreamSMHandler(
*stream, RstStreamFrame(streamID, unknownErrorCode, ingressDataLen));
transport->readLooper()->runLoopCallback();
// same test as above, but we tx a rst stream first followed by send stop
// sending second to validate that .stopSending() queues stream to be closed
NiceMock<MockReadCallback> readCb2;
streamID = transport->createBidirectionalStream().value();
transport->setReadCallback(streamID, &readCb2);
// add ingress & egress data to new stream
transport->addDataToStream(
streamID, StreamBuffer(folly::IOBuf::copyBuffer(ingressData), 0));
transport->writeChain(
streamID, folly::IOBuf::copyBuffer("some egress stream data"), false);
transport->driveReadCallbacks();
stream = CHECK_NOTNULL(transport->getStream(streamID));
// check stream has readable data and SM is open
EXPECT_TRUE(stream->hasReadableData());
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
// suppose we tx a rst stream (and rx its corresponding ack)
transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR);
sendRstAckSMHandler(*stream);
EXPECT_EQ(stream->sendState, StreamSendState::Closed);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
transport->driveReadCallbacks();
// send stop sending to peer does not deliver an error to the read callback
// even tho the stream is in terminal state and queued for closing
EXPECT_CALL(readCb2, readError(streamID, _)).Times(0);
transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR);
// check that we've discarded any ingress data and ingress SM is closed,
// expect terminal state and queued in closed streams
EXPECT_FALSE(stream->hasReadableData());
EXPECT_FALSE(streamManager.readableStreams().contains(streamID));
EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
// we need to rx a rst stream before queue stream to be closed to allow
// delivering callback to application
EXPECT_CALL(readCb2, readError(streamID, QuicError(unknownErrorCode)))
.Times(1);
receiveRstStreamSMHandler(
*stream, RstStreamFrame(streamID, unknownErrorCode, ingressDataLen));
EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
transport->readLooper()->runLoopCallback();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, NoopStopSendingIngressClosed) {
// create bidi stream
auto streamID = transport->createBidirectionalStream().value();
auto* stream = CHECK_NOTNULL(transport->getStream(streamID));