1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Remove partial reliability from mvfst.

Summary: As in title.

Reviewed By: yangchi

Differential Revision: D26701886

fbshipit-source-id: c7b36c616200b17fbf697eff4ba0d18695effb45
This commit is contained in:
Matt Joras
2021-03-03 15:29:11 -08:00
committed by Facebook GitHub Bot
parent 94676b44f8
commit 382c1cdcc6
55 changed files with 27 additions and 2686 deletions

View File

@@ -158,8 +158,6 @@ enum class FrameType : uint64_t {
// CONNECTION_CLOSE_APP_ERR frametype is use to indicate application errors
CONNECTION_CLOSE_APP_ERR = 0x1D,
HANDSHAKE_DONE = 0x1E,
MIN_STREAM_DATA = 0xFE, // subject to change
EXPIRED_STREAM_DATA = 0xFF, // subject to change
KNOB = 0x1550,
ACK_FREQUENCY = 0xAF,
};
@@ -261,14 +259,10 @@ enum class QuicVersion : uint32_t {
using QuicVersionType = std::underlying_type<QuicVersion>::type;
using TransportPartialReliabilitySetting = bool;
/**
* Parameter ids for private transport parameter
*/
constexpr uint16_t kPartialReliabilityParameterId = 0xFF00; // subject to change
constexpr uint16_t kD6DBasePMTUParameterId = 0xFF77;
constexpr uint16_t kD6DRaiseTimeoutParameterId = 0xFF95;
@@ -500,9 +494,6 @@ constexpr uint64_t kMaxRetryTokenValidMs = 1000 * 60 * 5;
constexpr uint64_t kDefaultActiveConnectionIdLimit = 2;
// default capability of QUIC partial reliability
constexpr TransportPartialReliabilitySetting kDefaultPartialReliability = false;
constexpr uint64_t kMaxPacketNumber = (1ull << 62) - 1;
// Use up to 3 bytes for the initial packet number.

View File

@@ -447,11 +447,6 @@ class QuicSocket {
*/
FOLLY_NODISCARD virtual bool isKnobSupported() const = 0;
/**
* Is partial reliability supported.
*/
virtual bool isPartiallyReliableTransport() const = 0;
/**
* Set stream priority.
* level: can only be in [0, 7].
@@ -667,68 +662,6 @@ class QuicSocket {
StreamId id,
size_t amount) = 0;
/**
* ===== Expire/reject API =====
*
* Sender can "expire" stream data by advancing receiver's minimum
* retransmittable offset, effectively discarding some of the previously
* sent (or planned to be sent) data.
* Discarded data may or may not have already arrived on the receiver end.
*
* Received can "reject" stream data by advancing sender's minimum
* retransmittable offset, effectively discarding some of the previously
* sent (or planned to be sent) data.
* Rejected data may or may not have already arrived on the receiver end.
*/
class DataExpiredCallback {
public:
virtual ~DataExpiredCallback() = default;
/**
* Called from the transport layer when sender informes us that data is
* expired on a given stream.
*/
virtual void onDataExpired(StreamId id, uint64_t newOffset) noexcept = 0;
};
virtual folly::Expected<folly::Unit, LocalErrorCode> setDataExpiredCallback(
StreamId id,
DataExpiredCallback* cb) = 0;
/**
* Expire data.
*
* The return value is Expected. If the value hasError(), then an error
* occured and it can be obtained with error().
*/
virtual folly::Expected<folly::Optional<uint64_t>, LocalErrorCode>
sendDataExpired(StreamId id, uint64_t offset) = 0;
class DataRejectedCallback {
public:
virtual ~DataRejectedCallback() = default;
/**
* Called from the transport layer when receiver informes us that data is
* not needed anymore on a given stream.
*/
virtual void onDataRejected(StreamId id, uint64_t newOffset) noexcept = 0;
};
virtual folly::Expected<folly::Unit, LocalErrorCode> setDataRejectedCallback(
StreamId id,
DataRejectedCallback* cb) = 0;
/**
* Reject data.
*
* The return value is Expected. If the value hasError(), then an error
* occured and it can be obtained with error().
*/
virtual folly::Expected<folly::Optional<uint64_t>, LocalErrorCode>
sendDataRejected(StreamId id, uint64_t offset) = 0;
/**
* ===== Write API =====
*/

View File

@@ -1001,170 +1001,6 @@ void QuicTransportBase::invokePeekDataAndCallbacks() {
}
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setDataExpiredCallback(
StreamId id,
DataExpiredCallback* cb) {
if (!conn_->partialReliabilityEnabled) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
VLOG(4) << "Setting DataExpiredCallback for stream=" << id << " cb=" << cb
<< " " << *this;
auto dataExpiredCbIt = dataExpiredCallbacks_.find(id);
if (dataExpiredCbIt == dataExpiredCallbacks_.end()) {
if (!cb) {
return folly::unit;
}
dataExpiredCbIt =
dataExpiredCallbacks_.emplace(id, DataExpiredCallbackData(cb)).first;
}
if (!cb) {
dataExpiredCallbacks_.erase(dataExpiredCbIt);
} else {
dataExpiredCbIt->second.dataExpiredCb = cb;
}
runOnEvbAsync([](auto self) { self->invokeDataExpiredCallbacks(); });
return folly::unit;
}
void QuicTransportBase::invokeDataExpiredCallbacks() {
if (!conn_->partialReliabilityEnabled || closeState_ != CloseState::OPEN) {
return;
}
auto self = sharedGuard();
for (auto streamId : self->conn_->streamManager->dataExpiredStreams()) {
auto callbackData = self->dataExpiredCallbacks_.find(streamId);
// Data expired is edge-triggered (nag only once on arrival), unlike read
// which is level-triggered (nag until application calls read() and
// clears the buffer).
if (callbackData == self->dataExpiredCallbacks_.end()) {
continue;
}
auto dataExpiredCb = callbackData->second.dataExpiredCb;
auto stream = conn_->streamManager->getStream(streamId);
if (dataExpiredCb && !stream->streamReadError) {
// If new offset is before current read offset, skip.
if (stream->currentReceiveOffset < stream->currentReadOffset) {
continue;
}
VLOG(10) << "invoking data expired callback on stream=" << streamId << " "
<< *this;
dataExpiredCb->onDataExpired(streamId, stream->currentReceiveOffset);
}
}
self->conn_->streamManager->clearDataExpired();
}
folly::Expected<folly::Optional<uint64_t>, LocalErrorCode>
QuicTransportBase::sendDataExpired(StreamId id, uint64_t offset) {
if (!conn_->partialReliabilityEnabled) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id);
auto newOffset = advanceMinimumRetransmittableOffset(stream, offset);
// Cancel byte event callbacks that are set for any offset below newOffset.
if (newOffset) {
cancelByteEventCallbacksForStream(id, *newOffset);
}
updateWriteLooper(true);
return folly::makeExpected<LocalErrorCode>(newOffset);
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setDataRejectedCallback(
StreamId id,
DataRejectedCallback* cb) {
if (!conn_->partialReliabilityEnabled) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
VLOG(4) << "Setting DataRejectedCallback for stream=" << id << " cb=" << cb
<< " " << *this;
auto dataRejectedCbIt = dataRejectedCallbacks_.find(id);
if (dataRejectedCbIt == dataRejectedCallbacks_.end()) {
if (!cb) {
return folly::unit;
}
dataRejectedCbIt =
dataRejectedCallbacks_.emplace(id, DataRejectedCallbackData(cb)).first;
}
if (!cb) {
dataRejectedCallbacks_.erase(dataRejectedCbIt);
} else {
dataRejectedCbIt->second.dataRejectedCb = cb;
}
runOnEvbAsync([](auto self) { self->invokeDataRejectedCallbacks(); });
return folly::unit;
}
void QuicTransportBase::invokeDataRejectedCallbacks() {
if (!conn_->partialReliabilityEnabled || closeState_ != CloseState::OPEN) {
return;
}
auto self = sharedGuard();
for (auto streamId : self->conn_->streamManager->dataRejectedStreams()) {
auto callbackData = self->dataRejectedCallbacks_.find(streamId);
// Data rejected is edge-triggered (nag only once on arrival), unlike read
// which is level-triggered (nag until application calls read() and
// clears the buffer).
if (callbackData == self->dataRejectedCallbacks_.end()) {
continue;
}
auto dataRejectedCb = callbackData->second.dataRejectedCb;
auto stream = conn_->streamManager->getStream(streamId);
// Invoke any delivery callbacks that are set for any offset below newly set
// minimumRetransmittableOffset.
if (!stream->streamReadError) {
cancelByteEventCallbacksForStream(
streamId, stream->minimumRetransmittableOffset);
}
if (dataRejectedCb && !stream->streamReadError) {
VLOG(10) << "invoking data rejected callback on stream=" << streamId
<< " " << *this;
dataRejectedCb->onDataRejected(
streamId, stream->minimumRetransmittableOffset);
}
}
self->conn_->streamManager->clearDataRejected();
}
void QuicTransportBase::invokeStreamsAvailableCallbacks() {
if (conn_->streamManager->consumeMaxLocalBidirectionalStreamIdIncreased()) {
// check in case new streams were created in preceding callbacks
@@ -1184,23 +1020,6 @@ void QuicTransportBase::invokeStreamsAvailableCallbacks() {
}
}
folly::Expected<folly::Optional<uint64_t>, LocalErrorCode>
QuicTransportBase::sendDataRejected(StreamId id, uint64_t offset) {
if (!conn_->partialReliabilityEnabled) {
return folly::makeUnexpected(LocalErrorCode::APP_ERROR);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id);
auto newOffset = advanceCurrentReceiveOffset(stream, offset);
updateWriteLooper(true);
return folly::makeExpected<LocalErrorCode>(newOffset);
}
void QuicTransportBase::updatePeekLooper() {
if (peekCallbacks_.empty() || closeState_ != CloseState::OPEN) {
VLOG(10) << "Stopping peek looper " << *this;
@@ -1742,15 +1561,6 @@ void QuicTransportBase::processCallbacksAfterNetworkData() {
deliverableStreamId = conn_->streamManager->popDeliverable();
}
invokeDataExpiredCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
invokeDataRejectedCallbacks();
if (closeState_ != CloseState::OPEN) {
return;
}
// Iterate over streams that changed their flow control window and give
// their registered listeners their updates.
// We don't really need flow control notifications when we are closed.
@@ -2666,8 +2476,6 @@ void QuicTransportBase::cancelAllAppCallbacks(
}
VLOG(4) << "Clearing " << peekCallbacks_.size() << " peek callbacks";
peekCallbacks_.clear();
dataExpiredCallbacks_.clear();
dataRejectedCallbacks_.clear();
if (connWriteCallback_) {
auto connWriteCallback = connWriteCallback_;
@@ -2699,9 +2507,6 @@ void QuicTransportBase::resetNonControlStreams(
if (writeCallbackIt != pendingWriteCallbacks_.end()) {
writeCallbackIt->second->onStreamWriteError(id, {error, errorMsg});
}
if (conn_->partialReliabilityEnabled) {
dataRejectedCallbacks_.erase(id);
}
resetStream(id, error);
}
if (isReceivingStream(conn_->nodeType, id) || isBidirectionalStream(id)) {
@@ -2710,9 +2515,6 @@ void QuicTransportBase::resetNonControlStreams(
readCallbackIt->second.readCb) {
readCallbackIt->second.readCb->readError(id, {error, errorMsg});
}
if (conn_->partialReliabilityEnabled) {
dataExpiredCallbacks_.erase(id);
}
peekCallbacks_.erase(id);
stopSending(id, error);
}
@@ -2970,10 +2772,6 @@ const TransportSettings& QuicTransportBase::getTransportSettings() const {
return conn_->transportSettings;
}
bool QuicTransportBase::isPartiallyReliableTransport() const {
return conn_->partialReliabilityEnabled;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamPriority(
StreamId id,

View File

@@ -151,22 +151,6 @@ class QuicTransportBase : public QuicSocket {
std::pair<LocalErrorCode, folly::Optional<uint64_t>>>
consume(StreamId id, uint64_t offset, size_t amount) override;
folly::Expected<folly::Unit, LocalErrorCode> setDataExpiredCallback(
StreamId id,
DataExpiredCallback* cb) override;
folly::Expected<folly::Optional<uint64_t>, LocalErrorCode> sendDataExpired(
StreamId id,
uint64_t offset) override;
folly::Expected<folly::Unit, LocalErrorCode> setDataRejectedCallback(
StreamId id,
DataRejectedCallback* cb) override;
folly::Expected<folly::Optional<uint64_t>, LocalErrorCode> sendDataRejected(
StreamId id,
uint64_t offset) override;
folly::Expected<StreamId, LocalErrorCode> createBidirectionalStream(
bool replaySafe = true) override;
folly::Expected<StreamId, LocalErrorCode> createUnidirectionalStream(
@@ -316,8 +300,6 @@ class QuicTransportBase : public QuicSocket {
*/
virtual std::shared_ptr<QuicTransportBase> sharedGuard() = 0;
bool isPartiallyReliableTransport() const override;
folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
StreamId id,
PriorityLevel level,
@@ -639,8 +621,6 @@ class QuicTransportBase : public QuicSocket {
void processCallbacksAfterNetworkData();
void invokeReadDataAndCallbacks();
void invokePeekDataAndCallbacks();
void invokeDataExpiredCallbacks();
void invokeDataRejectedCallbacks();
void invokeStreamsAvailableCallbacks();
void updateReadLooper();
void updatePeekLooper();
@@ -785,28 +765,12 @@ class QuicTransportBase : public QuicSocket {
PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {}
};
struct DataExpiredCallbackData {
DataExpiredCallback* dataExpiredCb;
bool resumed{true};
DataExpiredCallbackData(DataExpiredCallback* cb) : dataExpiredCb(cb) {}
};
struct DataRejectedCallbackData {
DataRejectedCallback* dataRejectedCb;
bool resumed{true};
DataRejectedCallbackData(DataRejectedCallback* cb) : dataRejectedCb(cb) {}
};
folly::F14FastMap<StreamId, ReadCallbackData> readCallbacks_;
folly::F14FastMap<StreamId, PeekCallbackData> peekCallbacks_;
ByteEventMap deliveryCallbacks_;
ByteEventMap txCallbacks_;
folly::F14FastMap<StreamId, DataExpiredCallbackData> dataExpiredCallbacks_;
folly::F14FastMap<StreamId, DataRejectedCallbackData> dataRejectedCallbacks_;
PingCallback* pingCallback_;
WriteCallback* connWriteCallback_{nullptr};

View File

@@ -95,7 +95,6 @@ class MockQuicSocket : public QuicSocket {
uint64_t,
SharedBuf));
MOCK_CONST_METHOD0(isKnobSupported, bool());
MOCK_CONST_METHOD0(isPartiallyReliableTransport, bool());
MOCK_METHOD3(
setStreamPriority,
folly::Expected<folly::Unit, LocalErrorCode>(StreamId, uint8_t, bool));
@@ -252,30 +251,6 @@ class MockQuicSocket : public QuicSocket {
consume,
folly::Expected<folly::Unit, LocalErrorCode>(StreamId, size_t));
MOCK_METHOD2(
setDataExpiredCallback,
folly::Expected<folly::Unit, LocalErrorCode>(
StreamId,
DataExpiredCallback*));
MOCK_METHOD2(
sendDataExpired,
folly::Expected<folly::Optional<uint64_t>, LocalErrorCode>(
StreamId,
uint64_t offset));
MOCK_METHOD2(
setDataRejectedCallback,
folly::Expected<folly::Unit, LocalErrorCode>(
StreamId,
DataRejectedCallback*));
MOCK_METHOD2(
sendDataRejected,
folly::Expected<folly::Optional<uint64_t>, LocalErrorCode>(
StreamId,
uint64_t offset));
MOCK_METHOD1(setCongestionControl, void(CongestionControlType));
ConnectionCallback* cb_;

View File

@@ -164,18 +164,6 @@ class MockByteEventCallback : public QuicSocket::ByteEventCallback {
}
};
class MockDataExpiredCallback : public QuicSocket::DataExpiredCallback {
public:
~MockDataExpiredCallback() override = default;
GMOCK_METHOD2_(, noexcept, , onDataExpired, void(StreamId, uint64_t));
};
class MockDataRejectedCallback : public QuicSocket::DataRejectedCallback {
public:
~MockDataRejectedCallback() override = default;
GMOCK_METHOD2_(, noexcept, , onDataRejected, void(StreamId, uint64_t));
};
class MockQuicTransport : public QuicServerTransport {
public:
using Ptr = std::shared_ptr<MockQuicTransport>;

View File

@@ -66,27 +66,6 @@ Buf encodeCryptoBuffer(StreamBuffer data) {
return buf;
}
// A made up encoding of a ExpiredStreamDataFrame.
Buf encodeExpiredStreamDataFrame(const ExpiredStreamDataFrame& frame) {
auto buf = IOBuf::create(17);
folly::io::Appender appender(buf.get(), 17);
appender.writeBE(static_cast<uint8_t>(TestFrameType::EXPIRED_DATA));
appender.writeBE<uint64_t>(frame.streamId);
appender.writeBE<uint64_t>(frame.minimumStreamOffset);
return buf;
}
// A made up encoding of a MinStreamDataFrame.
Buf encodeMinStreamDataFrame(const MinStreamDataFrame& frame) {
auto buf = IOBuf::create(25);
folly::io::Appender appender(buf.get(), 25);
appender.writeBE(static_cast<uint8_t>(TestFrameType::REJECTED_DATA));
appender.writeBE<uint64_t>(frame.streamId);
appender.writeBE<uint64_t>(frame.maximumData);
appender.writeBE<uint64_t>(frame.minimumStreamOffset);
return buf;
}
// A made up encoding of a MaxStreamsFrame.
Buf encodeMaxStreamsFrame(const MaxStreamsFrame& frame) {
auto buf = IOBuf::create(25);
@@ -120,21 +99,6 @@ StreamBuffer decodeCryptoBuffer(folly::io::Cursor& cursor) {
return StreamBuffer(std::move(dataBuffer.first), dataBuffer.second, false);
}
ExpiredStreamDataFrame decodeExpiredStreamDataFrame(folly::io::Cursor& cursor) {
ExpiredStreamDataFrame frame = ExpiredStreamDataFrame(0, 0);
frame.streamId = cursor.readBE<uint64_t>();
frame.minimumStreamOffset = cursor.readBE<uint64_t>();
return frame;
}
MinStreamDataFrame decodeMinStreamDataFrame(folly::io::Cursor& cursor) {
MinStreamDataFrame frame = MinStreamDataFrame(0, 0, 0);
frame.streamId = cursor.readBE<uint64_t>();
frame.maximumData = cursor.readBE<uint64_t>();
frame.minimumStreamOffset = cursor.readBE<uint64_t>();
return frame;
}
MaxStreamsFrame decodeMaxStreamsFrame(folly::io::Cursor& cursor) {
bool isBidi = cursor.readBE<uint8_t>();
auto maxStreams = cursor.readBE<uint64_t>();
@@ -197,23 +161,6 @@ class TestQuicTransport
auto cryptoBuffer = decodeCryptoBuffer(cursor);
appendDataToReadBuffer(
conn_->cryptoState->initialStream, std::move(cryptoBuffer));
} else if (type == TestFrameType::EXPIRED_DATA) {
auto expiredDataFrame = decodeExpiredStreamDataFrame(cursor);
QuicStreamState* stream =
conn_->streamManager->getStream(expiredDataFrame.streamId);
if (!stream) {
continue;
}
onRecvExpiredStreamDataFrame(stream, expiredDataFrame);
} else if (type == TestFrameType::REJECTED_DATA) {
auto minDataFrame = decodeMinStreamDataFrame(cursor);
QuicStreamState* stream =
conn_->streamManager->getStream(minDataFrame.streamId);
if (!stream) {
continue;
}
onRecvMinStreamDataFrame(stream, minDataFrame, packetNum_);
packetNum_++;
} else if (type == TestFrameType::MAX_STREAMS) {
auto maxStreamsFrame = decodeMaxStreamsFrame(cursor);
if (maxStreamsFrame.isForBidirectionalStream()) {
@@ -325,18 +272,6 @@ class TestQuicTransport
onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
}
void addExpiredStreamDataFrameToStream(ExpiredStreamDataFrame frame) {
auto buf = encodeExpiredStreamDataFrame(frame);
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
}
void addMinStreamDataFrameToStream(MinStreamDataFrame frame) {
auto buf = encodeMinStreamDataFrame(frame);
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
}
void addMaxStreamsFrame(MaxStreamsFrame frame) {
auto buf = encodeMaxStreamsFrame(frame);
SocketAddress addr("127.0.0.1", 1000);
@@ -1393,89 +1328,6 @@ TEST_F(QuicTransportImplTest, DeliveryCallbackUnsetOne) {
transport->close(folly::none);
}
TEST_F(QuicTransportImplTest, TxDeliveryCallbackOnSendDataExpire) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream2, 10, &txcb2);
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream2, 10, &dcb2);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
auto res = transport->sendDataExpired(stream1, 11);
EXPECT_EQ(res.hasError(), false);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _));
transport->close(folly::none);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
}
TEST_F(QuicTransportImplTest, TxDeliveryCallbackOnSendDataExpireCallbacksLeft) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream1, 20, &txcb1);
transport->registerTxCallback(stream2, 10, &txcb2);
transport->registerTxCallback(stream2, 20, &txcb2);
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream1, 20, &dcb1);
transport->registerDeliveryCallback(stream2, 10, &dcb2);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
auto res = transport->sendDataExpired(stream1, 11);
EXPECT_EQ(res.hasError(), false);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 20)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(1);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(2);
transport->close(folly::none);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
}
TEST_F(QuicTransportImplTest, RegisterTxDeliveryCallbackLowerThanExpected) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
@@ -2929,439 +2781,6 @@ TEST_F(QuicTransportImplTest, UpdatePeekableListWithStreamErrorTest) {
EXPECT_EQ(0, conn->streamManager->peekableStreams().count(streamId));
}
TEST_F(QuicTransportImplTest, DataExpiredCallbackDataAvailable) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StreamId stream3 = 0x6;
NiceMock<MockDataExpiredCallback> dataExpiredCb1;
NiceMock<MockDataExpiredCallback> dataExpiredCb2;
NiceMock<MockDataExpiredCallback> dataExpiredCb3;
transport->setDataExpiredCallback(stream1, &dataExpiredCb1);
transport->setDataExpiredCallback(stream2, &dataExpiredCb2);
EXPECT_CALL(dataExpiredCb1, onDataExpired(stream1, 5));
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 5));
EXPECT_CALL(dataExpiredCb2, onDataExpired(stream2, 13));
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream2, 13));
EXPECT_CALL(dataExpiredCb3, onDataExpired(_, _)).Times(0);
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream3, 42));
transport.reset();
}
TEST_F(QuicTransportImplTest, DataExpiredCallbackDataAvailableWithDataRead) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockDataExpiredCallback> dataExpiredCb1;
NiceMock<MockReadCallback> readCb1;
transport->setDataExpiredCallback(stream1, &dataExpiredCb1);
transport->setReadCallback(stream1, &readCb1);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
transport->read(stream1, 3);
// readOffset must be at 3, abandoned bytes length must be 2.
EXPECT_CALL(dataExpiredCb1, onDataExpired(stream1, 5));
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 5));
transport.reset();
}
class TestDataExpiredCallback : public QuicSocket::DataExpiredCallback {
public:
~TestDataExpiredCallback() override = default;
TestDataExpiredCallback(TestQuicTransport& transport, StreamId streamId)
: transport_(transport), streamId_(streamId) {}
void onDataExpired(StreamId, uint64_t) noexcept override {
auto peekCallback = [&](StreamId /* id */,
const folly::Range<PeekIterator>& /* range */) {
cbCalled_ = true;
};
transport_.peek(streamId_, std::move(peekCallback));
}
bool wasCbCalled() const {
return cbCalled_;
}
private:
TestQuicTransport& transport_;
StreamId streamId_;
bool cbCalled_{false};
};
TEST_F(
QuicTransportImplTest,
DataExpiredCallbackDataAvailableWithDataReadAndPeek) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
TestDataExpiredCallback peekExecCb =
TestDataExpiredCallback(*transport, stream1);
NiceMock<MockReadCallback> readCb1;
transport->setDataExpiredCallback(stream1, &peekExecCb);
transport->setReadCallback(stream1, &readCb1);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
transport->read(stream1, 3);
EXPECT_FALSE(peekExecCb.wasCbCalled());
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 5));
EXPECT_TRUE(peekExecCb.wasCbCalled());
transport.reset();
}
TEST_F(QuicTransportImplTest, DataExpiredCallbackChangeCallback) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockDataExpiredCallback> dataExpiredCb1;
NiceMock<MockDataExpiredCallback> dataExpiredCb2;
transport->setDataExpiredCallback(stream1, &dataExpiredCb1);
EXPECT_CALL(dataExpiredCb1, onDataExpired(stream1, 5));
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 5));
transport->setDataExpiredCallback(stream1, &dataExpiredCb2);
EXPECT_CALL(dataExpiredCb2, onDataExpired(stream1, 6));
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 6));
transport.reset();
}
TEST_F(QuicTransportImplTest, DataExpiredCallbackNoCallbackSet) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 5));
transport->addExpiredStreamDataFrameToStream(
ExpiredStreamDataFrame(stream1, 6));
transport.reset();
}
TEST_F(QuicTransportImplTest, DataExpiredCallbackInvalidStream) {
transport->transportConn->partialReliabilityEnabled = true;
NiceMock<MockDataExpiredCallback> dataExpiredCb1;
StreamId invalidStream = 10;
EXPECT_TRUE(transport->setDataExpiredCallback(invalidStream, &dataExpiredCb1)
.hasError());
transport.reset();
}
TEST_F(QuicTransportImplTest, SendDataExpired) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto streamState =
transport->transportConn->streamManager->getStream(stream1);
streamState->minimumRetransmittableOffset = 2;
streamState->flowControlState.peerAdvertisedMaxOffset = 10;
// Expect minimumRetransmittableOffset shift to 5.
auto res = transport->sendDataExpired(stream1, 5);
EXPECT_EQ(res.hasError(), false);
auto newOffsetOpt = res.value();
EXPECT_EQ(newOffsetOpt.has_value(), true);
EXPECT_EQ(newOffsetOpt.value(), 5);
EXPECT_EQ(streamState->minimumRetransmittableOffset, 5);
// Expect minimumRetransmittableOffset stay the same
// because 3 is smaller than current minimumRetransmittableOffset.
res = transport->sendDataExpired(stream1, 3);
EXPECT_EQ(res.hasError(), false);
newOffsetOpt = res.value();
EXPECT_EQ(newOffsetOpt.has_value(), false);
EXPECT_EQ(streamState->minimumRetransmittableOffset, 5);
// Expect minimumRetransmittableOffset be set to 10
// because 11 is larger than flowControlState.peerAdvertisedMaxOffset.
res = transport->sendDataExpired(stream1, 11);
EXPECT_EQ(res.hasError(), false);
newOffsetOpt = res.value();
EXPECT_EQ(newOffsetOpt.has_value(), true);
EXPECT_EQ(newOffsetOpt.value(), 10);
EXPECT_EQ(
streamState->minimumRetransmittableOffset,
streamState->flowControlState.peerAdvertisedMaxOffset);
transport.reset();
}
TEST_F(QuicTransportImplTest, DataRejecteddCallbackDataAvailable) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StreamId stream3 = 0x6;
NiceMock<MockDataRejectedCallback> dataRejectedCb1;
NiceMock<MockDataRejectedCallback> dataRejectedCb2;
NiceMock<MockDataRejectedCallback> dataRejectedCb3;
transport->setDataRejectedCallback(stream1, &dataRejectedCb1);
transport->setDataRejectedCallback(stream2, &dataRejectedCb2);
EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 5));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 5));
EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 13));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 13));
EXPECT_CALL(dataRejectedCb3, onDataRejected(_, _)).Times(0);
transport->addMinStreamDataFrameToStream(MinStreamDataFrame(stream3, 42, 39));
transport.reset();
}
TEST_F(QuicTransportImplTest, DataRejecteddCallbackWithTxAndDeliveryCallbacks) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
NiceMock<MockDataRejectedCallback> dataRejectedCb1;
NiceMock<MockDataRejectedCallback> dataRejectedCb2;
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream2, 20, &txcb2);
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
transport->setDataRejectedCallback(stream1, &dataRejectedCb1);
transport->setDataRejectedCallback(stream2, &dataRejectedCb2);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))).Times(1);
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(stream1, 10)).Times(1);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 15));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 15));
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb1);
EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))).Times(1);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(stream2, 20)).Times(1);
EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 23));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 23));
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb2);
EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
transport->close(folly::none);
}
TEST_F(
QuicTransportImplTest,
DataRejecteddCallbackWithTxAndDeliveryCallbacksSomeLeft) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
NiceMock<MockDataRejectedCallback> dataRejectedCb1;
NiceMock<MockDataRejectedCallback> dataRejectedCb2;
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream1, 25, &txcb1);
transport->registerTxCallback(stream2, 20, &txcb2);
transport->registerTxCallback(stream2, 29, &txcb2);
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream1, 25, &dcb1);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
transport->registerDeliveryCallback(stream2, 29, &dcb2);
transport->setDataRejectedCallback(stream1, &dataRejectedCb1);
transport->setDataRejectedCallback(stream2, &dataRejectedCb2);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10))).Times(1);
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(stream1, 10)).Times(1);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 15));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 15));
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb1);
EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20))).Times(1);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(stream2, 20)).Times(1);
EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 23));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 23));
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb2);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 25))).Times(1);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 29))).Times(1);
EXPECT_CALL(dcb1, onCanceled(stream1, 25)).Times(1);
EXPECT_CALL(dcb2, onCanceled(stream2, 29)).Times(1);
transport->close(folly::none);
}
TEST_F(QuicTransportImplTest, DataRejectedCallbackChangeCallback) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockDataRejectedCallback> dataRejectedCb1;
NiceMock<MockDataRejectedCallback> dataRejectedCb2;
transport->setDataRejectedCallback(stream1, &dataRejectedCb1);
EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 5));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 5));
transport->setDataRejectedCallback(stream1, &dataRejectedCb2);
EXPECT_CALL(dataRejectedCb2, onDataRejected(stream1, 6));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 6));
transport.reset();
}
TEST_F(QuicTransportImplTest, DataRejectedCallbackNoCallbackSet) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 5));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 6));
transport.reset();
}
TEST_F(QuicTransportImplTest, DataRejectedCallbackInvalidStream) {
transport->transportConn->partialReliabilityEnabled = true;
NiceMock<MockDataRejectedCallback> dataRejectedCb1;
StreamId invalidStream = 10;
EXPECT_TRUE(
transport->setDataRejectedCallback(invalidStream, &dataRejectedCb1)
.hasError());
transport.reset();
}
TEST_F(QuicTransportImplTest, SendDataRejected) {
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto streamState =
transport->transportConn->streamManager->getStream(stream1);
streamState->currentReceiveOffset = 0;
// Expect currentReceiveOffset to move to 5.
auto res = transport->sendDataRejected(stream1, 5);
EXPECT_EQ(res.hasError(), false);
auto newOffsetOpt = res.value();
EXPECT_EQ(newOffsetOpt.has_value(), true);
EXPECT_EQ(newOffsetOpt.value(), 5);
EXPECT_EQ(streamState->currentReceiveOffset, 5);
// Expect currentReceiveOffset to stay the same.
res = transport->sendDataRejected(stream1, 3);
EXPECT_EQ(res.hasError(), false);
newOffsetOpt = res.value();
EXPECT_EQ(newOffsetOpt.has_value(), false);
EXPECT_EQ(streamState->currentReceiveOffset, 5);
transport.reset();
}
TEST_F(QuicTransportImplTest, CloseFromCancelDeliveryCallbacksForStream) {
auto stream1 = *transport->createBidirectionalStream();
auto stream2 = *transport->createBidirectionalStream();
NiceMock<MockDeliveryCallback> deliveryCallback1;
NiceMock<MockDeliveryCallback> deliveryCallback2;
NiceMock<MockDeliveryCallback> deliveryCallback3;
transport->registerDeliveryCallback(stream1, 10, &deliveryCallback1);
transport->registerDeliveryCallback(stream1, 20, &deliveryCallback2);
transport->registerDeliveryCallback(stream2, 10, &deliveryCallback3);
EXPECT_CALL(deliveryCallback1, onCanceled(stream1, _))
.WillOnce(Invoke([&](auto, auto) { transport->close(folly::none); }));
EXPECT_CALL(deliveryCallback2, onCanceled(stream1, _));
EXPECT_CALL(deliveryCallback3, onCanceled(stream2, _));
transport->cancelDeliveryCallbacksForStream(stream1);
}
TEST_F(QuicTransportImplTest, SuccessfulPing) {
auto conn = transport->transportConn;
std::chrono::milliseconds interval(10);

View File

@@ -877,7 +877,6 @@ void QuicClientTransport::startCryptoHandshake() {
*clientConn_->initialDestinationConnectionId, version);
// Add partial reliability parameter to customTransportParameters_.
setPartialReliabilityTransportParameter();
setD6DBasePMTUTransportParameter();
setD6DRaiseTimeoutTransportParameter();
setD6DProbeTimeoutTransportParameter();
@@ -1523,20 +1522,6 @@ bool QuicClientTransport::setCustomTransportParameter(
return true;
}
void QuicClientTransport::setPartialReliabilityTransportParameter() {
uint64_t partialReliabilitySetting = 0;
if (conn_->transportSettings.partialReliabilityEnabled) {
partialReliabilitySetting = 1;
}
auto partialReliabilityCustomParam =
std::make_unique<CustomIntegralTransportParameter>(
kPartialReliabilityParameterId, partialReliabilitySetting);
if (!setCustomTransportParameter(std::move(partialReliabilityCustomParam))) {
LOG(ERROR) << "failed to set partial reliability transport parameter";
}
}
void QuicClientTransport::setD6DBasePMTUTransportParameter() {
if (!conn_->transportSettings.d6dConfig.enabled) {
return;

View File

@@ -205,7 +205,6 @@ class QuicClientTransport
HappyEyeballsConnAttemptDelayTimeout happyEyeballsConnAttemptDelayTimeout_;
private:
void setPartialReliabilityTransportParameter();
void setD6DBasePMTUTransportParameter();
void setD6DRaiseTimeoutTransportParameter();
void setD6DProbeTimeoutTransportParameter();

View File

@@ -97,9 +97,6 @@ void processServerInitialParams(
TransportParameterId::max_packet_size, serverParams.parameters);
auto statelessResetToken =
getStatelessResetTokenParameter(serverParams.parameters);
auto partialReliability = getIntegerParameter(
static_cast<TransportParameterId>(kPartialReliabilityParameterId),
serverParams.parameters);
auto activeConnectionIdLimit = getIntegerParameter(
TransportParameterId::active_connection_id_limit,
serverParams.parameters);
@@ -178,13 +175,6 @@ void processServerInitialParams(
conn.peerActiveConnectionIdLimit =
activeConnectionIdLimit.value_or(kDefaultActiveConnectionIdLimit);
if (partialReliability && *partialReliability != 0 &&
conn.transportSettings.partialReliabilityEnabled) {
conn.partialReliabilityEnabled = true;
}
VLOG(10) << "conn.partialReliabilityEnabled="
<< conn.partialReliabilityEnabled;
conn.statelessResetToken = std::move(statelessResetToken);
// Update the existing streams, because we allow streams to be created before
// the connection is established.

View File

@@ -667,53 +667,6 @@ ConnectionCloseFrame decodeApplicationClose(folly::io::Cursor& cursor) {
QuicErrorCode(errorCode), std::move(reasonPhrase));
}
MinStreamDataFrame decodeMinStreamDataFrame(folly::io::Cursor& cursor) {
auto streamId = decodeQuicInteger(cursor);
if (!streamId) {
throw QuicTransportException(
"Invalid streamId",
quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::MIN_STREAM_DATA);
}
auto maximumData = decodeQuicInteger(cursor);
if (!maximumData) {
throw QuicTransportException(
"Invalid maximumData",
quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::MIN_STREAM_DATA);
}
auto minimumStreamOffset = decodeQuicInteger(cursor);
if (!minimumStreamOffset) {
throw QuicTransportException(
"Invalid minimumStreamOffset",
quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::MIN_STREAM_DATA);
}
return MinStreamDataFrame(
folly::to<StreamId>(streamId->first),
maximumData->first,
minimumStreamOffset->first);
}
ExpiredStreamDataFrame decodeExpiredStreamDataFrame(folly::io::Cursor& cursor) {
auto streamId = decodeQuicInteger(cursor);
if (!streamId) {
throw QuicTransportException(
"Invalid streamId",
quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::EXPIRED_STREAM_DATA);
}
auto minimumStreamOffset = decodeQuicInteger(cursor);
if (!minimumStreamOffset) {
throw QuicTransportException(
"Invalid minimumStreamOffset",
quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::EXPIRED_STREAM_DATA);
}
return ExpiredStreamDataFrame(
folly::to<StreamId>(streamId->first), minimumStreamOffset->first);
}
HandshakeDoneFrame decodeHandshakeDoneFrame(folly::io::Cursor& /*cursor*/) {
return HandshakeDoneFrame();
}
@@ -843,10 +796,6 @@ QuicFrame parseFrame(
return QuicFrame(decodeConnectionCloseFrame(cursor));
case FrameType::CONNECTION_CLOSE_APP_ERR:
return QuicFrame(decodeApplicationClose(cursor));
case FrameType::MIN_STREAM_DATA:
return QuicFrame(decodeMinStreamDataFrame(cursor));
case FrameType::EXPIRED_STREAM_DATA:
return QuicFrame(decodeExpiredStreamDataFrame(cursor));
case FrameType::HANDSHAKE_DONE:
return QuicFrame(decodeHandshakeDoneFrame(cursor));
case FrameType::KNOB:

View File

@@ -84,10 +84,6 @@ MaxDataFrame decodeMaxDataFrame(folly::io::Cursor& cursor);
MaxStreamDataFrame decodeMaxStreamDataFrame(folly::io::Cursor& cursor);
ExpiredStreamDataFrame decodeExpiredStreamDataFrame(folly::io::Cursor& cursor);
MinStreamDataFrame decodeMinStreamDataFrame(folly::io::Cursor& cursor);
MaxStreamsFrame decodeBiDiMaxStreamsFrame(folly::io::Cursor& cursor);
MaxStreamsFrame decodeUniMaxStreamsFrame(folly::io::Cursor& cursor);

View File

@@ -227,12 +227,10 @@ const BufQueue* PacketRebuilder::cloneRetransmissionBuffer(
const WriteStreamFrame& frame,
const QuicStreamState* stream) {
/**
* StreamBuffer is removed from retransmissionBuffer in 4 cases.
* StreamBuffer is removed from retransmissionBuffer in 3 cases.
* 1: After send or receive RST.
* 2: Packet containing the buffer gets acked.
* 3: Packet containing the buffer is marked loss.
* 4: Skip (MIN_DATA or EXPIRED_DATA) frame is received with offset larger
* than what's in the retransmission buffer.
*
* Checking retransmittable() should cover first case. The latter three cases
* have to be covered by making sure we do not clone an already acked, lost or
@@ -242,12 +240,10 @@ const BufQueue* PacketRebuilder::cloneRetransmissionBuffer(
DCHECK(retransmittable(*stream));
auto iter = stream->retransmissionBuffer.find(frame.offset);
if (iter != stream->retransmissionBuffer.end()) {
if (streamFrameMatchesRetransmitBuffer(*stream, frame, *iter->second)) {
DCHECK(!frame.len || !iter->second->data.empty())
<< "WriteStreamFrame cloning: frame is not empty but StreamBuffer has"
<< " empty data. " << conn_;
return frame.len ? &(iter->second->data) : nullptr;
}
DCHECK(!frame.len || !iter->second->data.empty())
<< "WriteStreamFrame cloning: frame is not empty but StreamBuffer has"
<< " empty data. " << conn_;
return frame.len ? &(iter->second->data) : nullptr;
}
return nullptr;
}

View File

@@ -346,47 +346,6 @@ size_t writeSimpleFrame(
// no space left in packet
return size_t(0);
}
case QuicSimpleFrame::Type::MinStreamDataFrame: {
const MinStreamDataFrame& minStreamDataFrame =
*frame.asMinStreamDataFrame();
QuicInteger streamId(minStreamDataFrame.streamId);
QuicInteger maximumData(minStreamDataFrame.maximumData);
QuicInteger minimumStreamOffset(minStreamDataFrame.minimumStreamOffset);
QuicInteger frameType(
static_cast<FrameTypeType>(FrameType::MIN_STREAM_DATA));
auto minStreamDataFrameSize = frameType.getSize() + streamId.getSize() +
maximumData.getSize() + minimumStreamOffset.getSize();
if (packetSpaceCheck(spaceLeft, minStreamDataFrameSize)) {
builder.write(frameType);
builder.write(streamId);
builder.write(maximumData);
builder.write(minimumStreamOffset);
builder.appendFrame(QuicSimpleFrame(std::move(minStreamDataFrame)));
return minStreamDataFrameSize;
}
// no space left in packet
return size_t(0);
}
case QuicSimpleFrame::Type::ExpiredStreamDataFrame: {
const ExpiredStreamDataFrame& expiredStreamDataFrame =
*frame.asExpiredStreamDataFrame();
QuicInteger frameType(
static_cast<FrameTypeType>(FrameType::EXPIRED_STREAM_DATA));
QuicInteger streamId(expiredStreamDataFrame.streamId);
QuicInteger minimumStreamOffset(
expiredStreamDataFrame.minimumStreamOffset);
auto expiredStreamDataFrameSize = frameType.getSize() +
streamId.getSize() + minimumStreamOffset.getSize();
if (packetSpaceCheck(spaceLeft, expiredStreamDataFrameSize)) {
builder.write(frameType);
builder.write(streamId);
builder.write(minimumStreamOffset);
builder.appendFrame(QuicSimpleFrame(std::move(expiredStreamDataFrame)));
return expiredStreamDataFrameSize;
}
// no space left in packet
return size_t(0);
}
case QuicSimpleFrame::Type::PathChallengeFrame: {
const PathChallengeFrame& pathChallengeFrame =
*frame.asPathChallengeFrame();

View File

@@ -415,10 +415,6 @@ std::string toString(FrameType frame) {
return "CONNECTION_CLOSE";
case FrameType::CONNECTION_CLOSE_APP_ERR:
return "APPLICATION_CLOSE";
case FrameType::MIN_STREAM_DATA:
return "MIN_STREAM_DATA";
case FrameType::EXPIRED_STREAM_DATA:
return "EXPIRED_STREAM_DATA";
case FrameType::HANDSHAKE_DONE:
return "HANDSHAKE_DONE";
case FrameType::KNOB:

View File

@@ -427,42 +427,6 @@ struct MaxStreamDataFrame {
}
};
// The MinStreamDataFrame is used by a receiver to inform
// a sender of the maximum amount of data that can be sent on a stream
// (like MAX_STREAM_DATA frame) and to request an update to the minimum
// retransmittable offset for this stream.
struct MinStreamDataFrame {
StreamId streamId;
uint64_t maximumData;
uint64_t minimumStreamOffset;
MinStreamDataFrame(
StreamId streamIdIn,
uint64_t maximumDataIn,
uint64_t minimumStreamOffsetIn)
: streamId(streamIdIn),
maximumData(maximumDataIn),
minimumStreamOffset(minimumStreamOffsetIn) {}
bool operator==(const MinStreamDataFrame& rhs) const {
return streamId == rhs.streamId && maximumData == rhs.maximumData &&
minimumStreamOffset == rhs.minimumStreamOffset;
}
};
// The ExpiredStreamDataFrame is used by a sender to
// inform a receiver of the minimum retransmittable offset for a stream.
struct ExpiredStreamDataFrame {
StreamId streamId;
uint64_t minimumStreamOffset;
ExpiredStreamDataFrame(StreamId streamIdIn, uint64_t minimumStreamOffsetIn)
: streamId(streamIdIn), minimumStreamOffset(minimumStreamOffsetIn) {}
bool operator==(const ExpiredStreamDataFrame& rhs) const {
return streamId == rhs.streamId &&
minimumStreamOffset == rhs.minimumStreamOffset;
}
};
struct MaxStreamsFrame {
// A count of the cumulative number of streams
uint64_t maxStreams;
@@ -654,8 +618,6 @@ struct RetryToken {
#define QUIC_SIMPLE_FRAME(F, ...) \
F(StopSendingFrame, __VA_ARGS__) \
F(MinStreamDataFrame, __VA_ARGS__) \
F(ExpiredStreamDataFrame, __VA_ARGS__) \
F(PathChallengeFrame, __VA_ARGS__) \
F(PathResponseFrame, __VA_ARGS__) \
F(NewConnectionIdFrame, __VA_ARGS__) \

View File

@@ -690,57 +690,6 @@ TEST_F(DecodeTest, NewTokenIncorrectDataLength) {
EXPECT_THROW(decodeNewTokenFrame(cursor), QuicTransportException);
}
std::unique_ptr<folly::IOBuf> createMinOrExpiredStreamDataFrame(
QuicInteger streamId,
folly::Optional<QuicInteger> maximumData = folly::none,
folly::Optional<QuicInteger> minimumStreamOffset = folly::none) {
std::unique_ptr<folly::IOBuf> bufQueue = folly::IOBuf::create(0);
BufAppender wcursor(bufQueue.get(), 10);
auto appenderOp = [&](auto val) { wcursor.writeBE(val); };
streamId.encode(appenderOp);
if (maximumData) {
maximumData->encode(appenderOp);
}
if (minimumStreamOffset) {
minimumStreamOffset->encode(appenderOp);
}
return bufQueue;
}
TEST_F(DecodeTest, DecodeMinStreamDataFrame) {
QuicInteger streamId(10);
QuicInteger maximumData(1000);
QuicInteger minimumStreamOffset(100);
auto noOffset = createMinOrExpiredStreamDataFrame(streamId, maximumData);
folly::io::Cursor cursor0(noOffset.get());
EXPECT_THROW(decodeMinStreamDataFrame(cursor0), QuicTransportException);
auto minStreamDataFrame = createMinOrExpiredStreamDataFrame(
streamId, maximumData, minimumStreamOffset);
folly::io::Cursor cursor(minStreamDataFrame.get());
auto result = decodeMinStreamDataFrame(cursor);
EXPECT_EQ(result.streamId, 10);
EXPECT_EQ(result.maximumData, 1000);
EXPECT_EQ(result.minimumStreamOffset, 100);
}
TEST_F(DecodeTest, DecodeExpiredStreamDataFrame) {
QuicInteger streamId(10);
QuicInteger offset(100);
auto noOffset = createMinOrExpiredStreamDataFrame(streamId);
folly::io::Cursor cursor0(noOffset.get());
EXPECT_THROW(decodeExpiredStreamDataFrame(cursor0), QuicTransportException);
auto expiredStreamDataFrame =
createMinOrExpiredStreamDataFrame(streamId, folly::none, offset);
folly::io::Cursor cursor(expiredStreamDataFrame.get());
auto result = decodeExpiredStreamDataFrame(cursor);
EXPECT_EQ(result.streamId, 10);
EXPECT_EQ(result.minimumStreamOffset, 100);
}
TEST_F(DecodeTest, ParsePlaintextRetryToken) {
ConnectionId odcid = getTestConnectionId();
folly::IPAddress clientIp("109.115.3.49");

View File

@@ -1573,71 +1573,6 @@ TEST_F(QuicWriteCodecTest, NoSpaceForNewConnId) {
EXPECT_EQ(0, writeFrame(QuicSimpleFrame(newConnId), pktBuilder));
}
TEST_F(QuicWriteCodecTest, WriteExpiredStreamDataFrame) {
MockQuicPacketBuilder pktBuilder;
setupCommonExpects(pktBuilder);
StreamId id = 10;
uint64_t offset = 0x08;
ExpiredStreamDataFrame expiredStreamDataFrame(id, offset);
auto bytesWritten =
writeFrame(QuicSimpleFrame(expiredStreamDataFrame), pktBuilder);
auto builtOut = std::move(pktBuilder).buildTestPacket();
auto regularPacket = builtOut.first;
EXPECT_EQ(bytesWritten, 4);
ExpiredStreamDataFrame result =
*regularPacket.frames[0].asQuicSimpleFrame()->asExpiredStreamDataFrame();
EXPECT_EQ(id, result.streamId);
EXPECT_EQ(offset, result.minimumStreamOffset);
auto wireBuf = std::move(builtOut.second);
BufQueue queue;
queue.append(wireBuf->clone());
QuicFrame decodedFrame = parseQuicFrame(queue);
QuicSimpleFrame& simpleFrame = *decodedFrame.asQuicSimpleFrame();
ExpiredStreamDataFrame wireExpiredStreamDataFrame =
*simpleFrame.asExpiredStreamDataFrame();
EXPECT_EQ(id, wireExpiredStreamDataFrame.streamId);
EXPECT_EQ(offset, wireExpiredStreamDataFrame.minimumStreamOffset);
// At last, verify there is nothing left in the wire format bytes:
EXPECT_EQ(queue.chainLength(), 0);
}
TEST_F(QuicWriteCodecTest, WriteMinStreamDataFrame) {
MockQuicPacketBuilder pktBuilder;
setupCommonExpects(pktBuilder);
StreamId id = 10;
uint64_t maximumData = 0x64;
uint64_t offset = 0x08;
MinStreamDataFrame minStreamDataFrame(id, maximumData, offset);
auto bytesWritten =
writeFrame(QuicSimpleFrame(minStreamDataFrame), pktBuilder);
auto builtOut = std::move(pktBuilder).buildTestPacket();
auto regularPacket = builtOut.first;
EXPECT_EQ(bytesWritten, 6);
MinStreamDataFrame result =
*regularPacket.frames[0].asQuicSimpleFrame()->asMinStreamDataFrame();
EXPECT_EQ(id, result.streamId);
EXPECT_EQ(maximumData, result.maximumData);
EXPECT_EQ(offset, result.minimumStreamOffset);
auto wireBuf = std::move(builtOut.second);
BufQueue queue;
queue.append(wireBuf->clone());
QuicFrame decodedFrame = parseQuicFrame(queue);
QuicSimpleFrame& simpleFrame = *decodedFrame.asQuicSimpleFrame();
MinStreamDataFrame wireMinStreamDataFrame =
*simpleFrame.asMinStreamDataFrame();
EXPECT_EQ(id, wireMinStreamDataFrame.streamId);
EXPECT_EQ(maximumData, wireMinStreamDataFrame.maximumData);
EXPECT_EQ(offset, wireMinStreamDataFrame.minimumStreamOffset);
// At last, verify there is nothing left in the wire format bytes:
EXPECT_EQ(queue.chainLength(), 0);
}
TEST_F(QuicWriteCodecTest, WritePathChallenge) {
MockQuicPacketBuilder pktBuilder;
setupCommonExpects(pktBuilder);

View File

@@ -109,7 +109,6 @@ class ClientHandshakeTest : public Test, public boost::static_visitor<> {
kDefaultIdleTimeout,
kDefaultAckDelayExponent,
kDefaultUDPSendPacketLen,
kDefaultPartialReliability,
generateStatelessResetToken(),
ConnectionId(std::vector<uint8_t>{0xff, 0xfe, 0xfd, 0xfc}),
ConnectionId(std::vector<uint8_t>()));

View File

@@ -921,118 +921,6 @@ TEST_P(QuicClientTransportIntegrationTest, TestStatelessResetToken) {
EXPECT_EQ(token1.value(), token2.value());
}
TEST_P(QuicClientTransportIntegrationTest, PartialReliabilityDisabledTest) {
expectTransportCallbacks();
TransportSettings settings;
settings.connectUDP = true;
settings.partialReliabilityEnabled = false;
client->setTransportSettings(settings);
TransportSettings serverSettings;
serverSettings.partialReliabilityEnabled = false;
serverSettings.statelessResetTokenSecret = getRandSecret();
server_->setTransportSettings(serverSettings);
client->start(&clientConnCallback);
EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] {
CHECK(client->getConn().oneRttWriteCipher);
eventbase_.terminateLoopSoon();
}));
eventbase_.loopForever();
auto streamId = client->createBidirectionalStream().value();
auto data = IOBuf::copyBuffer("hello");
auto expected = std::shared_ptr<IOBuf>(IOBuf::copyBuffer("echo "));
expected->prependChain(data->clone());
sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb);
EXPECT_FALSE(client->isPartiallyReliableTransport());
}
TEST_P(QuicClientTransportIntegrationTest, PartialReliabilityDisabledTest2) {
expectTransportCallbacks();
TransportSettings settings;
settings.connectUDP = true;
settings.partialReliabilityEnabled = true;
client->setTransportSettings(settings);
TransportSettings serverSettings;
serverSettings.partialReliabilityEnabled = false;
serverSettings.statelessResetTokenSecret = getRandSecret();
server_->setTransportSettings(serverSettings);
client->start(&clientConnCallback);
EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] {
CHECK(client->getConn().oneRttWriteCipher);
eventbase_.terminateLoopSoon();
}));
eventbase_.loopForever();
auto streamId = client->createBidirectionalStream().value();
auto data = IOBuf::copyBuffer("hello");
auto expected = std::shared_ptr<IOBuf>(IOBuf::copyBuffer("echo "));
expected->prependChain(data->clone());
sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb);
EXPECT_FALSE(client->isPartiallyReliableTransport());
}
TEST_P(QuicClientTransportIntegrationTest, PartialReliabilityDisabledTest3) {
expectTransportCallbacks();
TransportSettings settings;
settings.connectUDP = true;
settings.partialReliabilityEnabled = false;
client->setTransportSettings(settings);
TransportSettings serverSettings;
serverSettings.partialReliabilityEnabled = true;
serverSettings.statelessResetTokenSecret = getRandSecret();
server_->setTransportSettings(serverSettings);
client->start(&clientConnCallback);
EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] {
CHECK(client->getConn().oneRttWriteCipher);
eventbase_.terminateLoopSoon();
}));
eventbase_.loopForever();
auto streamId = client->createBidirectionalStream().value();
auto data = IOBuf::copyBuffer("hello");
auto expected = std::shared_ptr<IOBuf>(IOBuf::copyBuffer("echo "));
expected->prependChain(data->clone());
sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb);
EXPECT_FALSE(client->isPartiallyReliableTransport());
}
TEST_P(QuicClientTransportIntegrationTest, PartialReliabilityEnabledTest) {
expectTransportCallbacks();
TransportSettings settings;
settings.connectUDP = true;
settings.partialReliabilityEnabled = true;
client->setTransportSettings(settings);
TransportSettings serverSettings;
serverSettings.partialReliabilityEnabled = true;
serverSettings.statelessResetTokenSecret = getRandSecret();
server_->setTransportSettings(serverSettings);
client->start(&clientConnCallback);
EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] {
CHECK(client->getConn().oneRttWriteCipher);
eventbase_.terminateLoopSoon();
}));
eventbase_.loopForever();
auto streamId = client->createBidirectionalStream().value();
auto data = IOBuf::copyBuffer("hello");
auto expected = std::shared_ptr<IOBuf>(IOBuf::copyBuffer("echo "));
expected->prependChain(data->clone());
sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb);
EXPECT_TRUE(client->isPartiallyReliableTransport());
}
TEST_P(QuicClientTransportIntegrationTest, D6DEnabledTest) {
expectTransportCallbacks();
TransportSettings settings;
@@ -1052,59 +940,6 @@ TEST_P(QuicClientTransportIntegrationTest, D6DEnabledTest) {
eventbase_.loopForever();
}
TEST_P(
QuicClientTransportIntegrationTest,
PartialReliabilityEnableDisableRunTimeTest) {
expectTransportCallbacks();
TransportSettings settings;
settings.connectUDP = true;
settings.partialReliabilityEnabled = true;
client->setTransportSettings(settings);
// Enable PR on server.
TransportSettings serverSettings;
serverSettings.partialReliabilityEnabled = true;
serverSettings.statelessResetTokenSecret = getRandSecret();
server_->setTransportSettings(serverSettings);
client->start(&clientConnCallback);
EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] {
CHECK(client->getConn().oneRttWriteCipher);
eventbase_.terminateLoopSoon();
}));
eventbase_.loopForever();
auto streamId = client->createBidirectionalStream().value();
auto data = IOBuf::copyBuffer("hello");
auto expected = std::shared_ptr<IOBuf>(IOBuf::copyBuffer("echo "));
expected->prependChain(data->clone());
sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb);
// Client successfully negotiated partial reliability on connection.
EXPECT_TRUE(client->isPartiallyReliableTransport());
// Disable PR on server.
server_->enablePartialReliability(false);
// Re-connect.
client = createClient();
expectTransportCallbacks();
client->setTransportSettings(settings);
client->start(&clientConnCallback);
EXPECT_CALL(clientConnCallback, onTransportReady()).WillOnce(Invoke([&] {
CHECK(client->getConn().oneRttWriteCipher);
eventbase_.terminateLoopSoon();
}));
eventbase_.loopForever();
streamId = client->createBidirectionalStream().value();
sendRequestAndResponseAndWait(*expected, data->clone(), streamId, &readCb);
// Second connection should not successfully negotiate partial reliability,
// even though client still has it locally enabled.
EXPECT_FALSE(client->isPartiallyReliableTransport());
}
INSTANTIATE_TEST_CASE_P(
QuicClientTransportIntegrationTests,
QuicClientTransportIntegrationTest,

View File

@@ -441,8 +441,4 @@ MaxStreamDataFrame generateMaxStreamDataFrame(const QuicStreamState& stream) {
return MaxStreamDataFrame(stream.id, calculateMaximumData(stream));
}
MinStreamDataFrame generateMinStreamDataFrame(const QuicStreamState& stream) {
return MinStreamDataFrame(
stream.id, calculateMaximumData(stream), stream.currentReceiveOffset);
}
} // namespace quic

View File

@@ -140,10 +140,4 @@ MaxDataFrame generateMaxDataFrame(const QuicConnectionStateBase& conn);
*/
MaxStreamDataFrame generateMaxStreamDataFrame(const QuicStreamState& stream);
/**
* Generate a new MinStreamDataFrame with the latest flow control state and
* window size of stream.
*/
MinStreamDataFrame generateMinStreamDataFrame(const QuicStreamState& stream);
} // namespace quic

View File

@@ -13,20 +13,6 @@ void addQuicSimpleFrameToEvent(
frame.streamId, frame.errorCode));
break;
}
case quic::QuicSimpleFrame::Type::MinStreamDataFrame: {
const quic::MinStreamDataFrame& frame =
*simpleFrame.asMinStreamDataFrame();
event->frames.push_back(std::make_unique<quic::MinStreamDataFrameLog>(
frame.streamId, frame.maximumData, frame.minimumStreamOffset));
break;
}
case quic::QuicSimpleFrame::Type::ExpiredStreamDataFrame: {
const quic::ExpiredStreamDataFrame& frame =
*simpleFrame.asExpiredStreamDataFrame();
event->frames.push_back(std::make_unique<quic::ExpiredStreamDataFrameLog>(
frame.streamId, frame.minimumStreamOffset));
break;
}
case quic::QuicSimpleFrame::Type::PathChallengeFrame: {
const quic::PathChallengeFrame& frame =
*simpleFrame.asPathChallengeFrame();

View File

@@ -71,10 +71,6 @@ folly::StringPiece toQlogString(FrameType frame) {
case FrameType::CONNECTION_CLOSE:
case FrameType::CONNECTION_CLOSE_APP_ERR:
return "connection_close";
case FrameType::MIN_STREAM_DATA:
return "min_stream_data";
case FrameType::EXPIRED_STREAM_DATA:
return "expired_stream_data";
case FrameType::HANDSHAKE_DONE:
return "handshake_done";
case FrameType::KNOB:

View File

@@ -154,23 +154,6 @@ folly::dynamic StopSendingFrameLog::toDynamic() const {
return d;
}
folly::dynamic MinStreamDataFrameLog::toDynamic() const {
folly::dynamic d = folly::dynamic::object();
d["frame_type"] = toQlogString(FrameType::MIN_STREAM_DATA);
d["stream_id"] = streamId;
d["maximum_data"] = maximumData;
d["minimum_stream_offset"] = minimumStreamOffset;
return d;
}
folly::dynamic ExpiredStreamDataFrameLog::toDynamic() const {
folly::dynamic d = folly::dynamic::object();
d["frame_type"] = toQlogString(FrameType::EXPIRED_STREAM_DATA);
d["stream_id"] = streamId;
d["minimum_stream_offset"] = minimumStreamOffset;
return d;
}
folly::dynamic PathChallengeFrameLog::toDynamic() const {
folly::dynamic d = folly::dynamic::object();
d["frame_type"] = toQlogString(FrameType::PATH_CHALLENGE);

View File

@@ -242,34 +242,6 @@ class StopSendingFrameLog : public QLogFrame {
folly::dynamic toDynamic() const override;
};
class MinStreamDataFrameLog : public QLogFrame {
public:
StreamId streamId;
uint64_t maximumData;
uint64_t minimumStreamOffset;
MinStreamDataFrameLog(
StreamId streamIdIn,
uint64_t maximumDataIn,
uint64_t minimumStreamOffsetIn)
: streamId{streamIdIn},
maximumData{maximumDataIn},
minimumStreamOffset{minimumStreamOffsetIn} {}
~MinStreamDataFrameLog() override = default;
folly::dynamic toDynamic() const override;
};
class ExpiredStreamDataFrameLog : public QLogFrame {
public:
StreamId streamId;
uint64_t minimumStreamOffset;
ExpiredStreamDataFrameLog(StreamId streamIdIn, uint64_t minimumStreamOffsetIn)
: streamId{streamIdIn}, minimumStreamOffset{minimumStreamOffsetIn} {}
~ExpiredStreamDataFrameLog() override = default;
folly::dynamic toDynamic() const override;
};
class PathChallengeFrameLog : public QLogFrame {
public:
uint64_t pathData;

View File

@@ -111,12 +111,6 @@ void markPacketLoss(
// not have the offset.
break;
}
// The original rxmt offset might have been bumped up after it was
// shrunk due to egress partially reliable skip.
if (!streamFrameMatchesRetransmitBuffer(
*stream, frame, *bufferItr->second)) {
break;
}
stream->insertIntoLossBuffer(std::move(bufferItr->second));
stream->retransmissionBuffer.erase(bufferItr);
conn.streamManager->updateLossStreams(*stream);

View File

@@ -28,11 +28,10 @@ namespace quic {
namespace samples {
class EchoClient : public quic::QuicSocket::ConnectionCallback,
public quic::QuicSocket::ReadCallback,
public quic::QuicSocket::WriteCallback,
public quic::QuicSocket::DataExpiredCallback {
public quic::QuicSocket::WriteCallback {
public:
EchoClient(const std::string& host, uint16_t port, bool prEnabled = false)
: host_(host), port_(port), prEnabled_(prEnabled) {}
EchoClient(const std::string& host, uint16_t port)
: host_(host), port_(port) {}
void readAvailable(quic::StreamId streamId) noexcept override {
auto readData = quicClient_->read(streamId, 0);
@@ -106,12 +105,6 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
<< " error=" << toString(error);
}
void onDataExpired(StreamId streamId, uint64_t newOffset) noexcept override {
LOG(INFO) << "Client received skipData; "
<< newOffset - recvOffsets_[streamId]
<< " bytes skipped on stream=" << streamId;
}
void start() {
folly::ScopedEventBaseThread networkThread("EchoClientThread");
auto evb = networkThread.getEventBase();
@@ -129,9 +122,6 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
quicClient_->addNewPeerAddress(addr);
TransportSettings settings;
if (prEnabled_) {
settings.partialReliabilityEnabled = true;
}
quicClient_->setTransportSettings(settings);
quicClient_->setTransportStatsCallback(
@@ -154,9 +144,6 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
// create new stream for each message
auto streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, this);
if (prEnabled_) {
client->setDataExpiredCallback(streamId, this);
}
pendingOutput_[streamId].append(folly::IOBuf::copyBuffer(message));
sendMessage(streamId, pendingOutput_[streamId]);
});
@@ -183,7 +170,6 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
std::string host_;
uint16_t port_;
bool prEnabled_;
std::shared_ptr<quic::QuicClientTransport> quicClient_;
std::map<quic::StreamId, BufQueue> pendingOutput_;
std::map<quic::StreamId, uint64_t> recvOffsets_;

View File

@@ -21,8 +21,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
public:
using StreamData = std::pair<BufQueue, bool>;
explicit EchoHandler(folly::EventBase* evbIn, bool prEnabled = false)
: evb(evbIn), prEnabled_(prEnabled) {}
explicit EchoHandler(folly::EventBase* evbIn) : evb(evbIn) {}
void setQuicSocket(std::shared_ptr<quic::QuicSocket> socket) {
sock = socket;
@@ -75,11 +74,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
input_[id].first.append(std::move(data));
input_[id].second = eof;
if (eof) {
if (prEnabled_) {
skipSomeEchoSome(id, input_[id]);
} else {
echo(id, input_[id]);
}
echo(id, input_[id]);
}
}
@@ -110,45 +105,6 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
}
}
/**
* Skips first part, echoes the second part.
* E.g. "batman" -> "???man"
*/
void skipSomeEchoSome(quic::StreamId id, StreamData& data) {
if (!data.second) {
// only echo when eof is present
return;
}
auto& originalData = data.first;
auto dataLen = originalData.chainLength();
auto toSplit = dataLen / 2;
if (toSplit > 0) {
auto skipRes = sock->sendDataExpired(id, toSplit);
if (skipRes.hasError()) {
LOG(ERROR) << "skip error=" << toString(skipRes.error());
} else {
auto v = skipRes.value();
if (v.has_value()) {
LOG(INFO) << "new offset = " << v.value();
} else {
LOG(INFO) << "new offset doesn't have value";
}
}
}
originalData.splitAtMost(toSplit);
auto res = sock->writeChain(id, originalData.move(), true, nullptr);
if (res.hasError()) {
LOG(ERROR) << "write error=" << toString(res.error());
} else {
// echo is done, clear EOF
data.second = false;
}
}
void onStreamWriteReady(quic::StreamId id, uint64_t maxToSend) noexcept
override {
LOG(INFO) << "socket is write ready with maxToSend=" << maxToSend;
@@ -172,7 +128,6 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
private:
std::map<quic::StreamId, StreamData> input_;
bool prEnabled_;
};
} // namespace samples
} // namespace quic

View File

@@ -35,7 +35,7 @@ class EchoServerTransportFactory : public quic::QuicServerTransportFactory {
}
}
EchoServerTransportFactory(bool prEnabled = false) : prEnabled_(prEnabled) {}
EchoServerTransportFactory() = default;
quic::QuicServerTransport::Ptr make(
folly::EventBase* evb,
@@ -44,7 +44,7 @@ class EchoServerTransportFactory : public quic::QuicServerTransportFactory {
std::shared_ptr<const fizz::server::FizzServerContext> ctx) noexcept
override {
CHECK_EQ(evb, sock->getEventBase());
auto echoHandler = std::make_unique<EchoHandler>(evb, prEnabled_);
auto echoHandler = std::make_unique<EchoHandler>(evb);
auto transport = quic::QuicServerTransport::make(
evb, std::move(sock), *echoHandler, ctx);
echoHandler->setQuicSocket(transport);
@@ -55,31 +55,19 @@ class EchoServerTransportFactory : public quic::QuicServerTransportFactory {
std::vector<std::unique_ptr<EchoHandler>> echoHandlers_;
private:
bool prEnabled_;
};
class EchoServer {
public:
explicit EchoServer(
const std::string& host = "::1",
uint16_t port = 6666,
bool prEnabled = false)
: host_(host),
port_(port),
prEnabled_(prEnabled),
server_(QuicServer::createQuicServer()) {
explicit EchoServer(const std::string& host = "::1", uint16_t port = 6666)
: host_(host), port_(port), server_(QuicServer::createQuicServer()) {
server_->setQuicServerTransportFactory(
std::make_unique<EchoServerTransportFactory>(prEnabled_));
std::make_unique<EchoServerTransportFactory>());
server_->setTransportStatsCallbackFactory(
std::make_unique<LogQuicStatsFactory>());
auto serverCtx = quic::test::createServerCtx();
serverCtx->setClock(std::make_shared<fizz::SystemClock>());
server_->setFizzContext(serverCtx);
if (prEnabled_) {
TransportSettings settings;
settings.partialReliabilityEnabled = true;
server_->setTransportSettings(settings);
}
}
void start() {
@@ -94,7 +82,6 @@ class EchoServer {
private:
std::string host_;
uint16_t port_;
bool prEnabled_;
folly::EventBase eventbase_;
std::shared_ptr<quic::QuicServer> server_;
};

View File

@@ -18,7 +18,6 @@
DEFINE_string(host, "::1", "Echo server hostname/IP");
DEFINE_int32(port, 6666, "Echo server port");
DEFINE_string(mode, "server", "Mode to run in: 'client' or 'server'");
DEFINE_bool(pr, false, "Enable partially realible mode");
using namespace quic::samples;
@@ -33,14 +32,14 @@ int main(int argc, char* argv[]) {
fizz::CryptoUtils::init();
if (FLAGS_mode == "server") {
EchoServer server(FLAGS_host, FLAGS_port, FLAGS_pr);
EchoServer server(FLAGS_host, FLAGS_port);
server.start();
} else if (FLAGS_mode == "client") {
if (FLAGS_host.empty() || FLAGS_port == 0) {
LOG(ERROR) << "EchoClient expected --host and --port";
return -2;
}
EchoClient client(FLAGS_host, FLAGS_port, FLAGS_pr);
EchoClient client(FLAGS_host, FLAGS_port);
client.start();
} else {
LOG(ERROR) << "Unknown mode specified: " << FLAGS_mode;

View File

@@ -633,13 +633,6 @@ void QuicServer::rejectNewConnections(bool reject) {
[reject](auto worker) mutable { worker->rejectNewConnections(reject); });
}
void QuicServer::enablePartialReliability(bool enabled) {
transportSettings_.partialReliabilityEnabled = enabled;
runOnAllWorkers([enabled](auto worker) mutable {
worker->enablePartialReliability(enabled);
});
}
void QuicServer::setEventBaseObserver(
std::shared_ptr<folly::EventBaseObserver> observer) {
if (shutdown_ || workerEvbs_.empty()) {

View File

@@ -1069,10 +1069,6 @@ void QuicServerWorker::rejectNewConnections(bool rejectNewConnections) {
rejectNewConnections_ = rejectNewConnections;
}
void QuicServerWorker::enablePartialReliability(bool enabled) {
transportSettings_.partialReliabilityEnabled = enabled;
}
void QuicServerWorker::setHealthCheckToken(
const std::string& healthCheckToken) {
healthCheckToken_ = folly::IOBuf::copyBuffer(healthCheckToken);

View File

@@ -27,7 +27,6 @@ class ServerTransportParametersExtension : public fizz::ServerExtensions {
std::chrono::milliseconds idleTimeout,
uint64_t ackDelayExponent,
uint64_t maxRecvPacketSize,
TransportPartialReliabilitySetting partialReliability,
const StatelessResetToken& token,
ConnectionId initialSourceCid,
ConnectionId originalDestinationCid)
@@ -41,7 +40,6 @@ class ServerTransportParametersExtension : public fizz::ServerExtensions {
idleTimeout_(idleTimeout),
ackDelayExponent_(ackDelayExponent),
maxRecvPacketSize_(maxRecvPacketSize),
partialReliability_(partialReliability),
token_(token),
initialSourceCid_(initialSourceCid),
originalDestinationCid_(originalDestinationCid) {}
@@ -95,14 +93,6 @@ class ServerTransportParametersExtension : public fizz::ServerExtensions {
statelessReset.value = folly::IOBuf::copyBuffer(token_);
params.parameters.push_back(std::move(statelessReset));
uint64_t partialReliabilitySetting = 0;
if (partialReliability_) {
partialReliabilitySetting = 1;
}
params.parameters.push_back(encodeIntegerParameter(
static_cast<TransportParameterId>(kPartialReliabilityParameterId),
partialReliabilitySetting));
if (encodingVersion_ == QuicVersion::QUIC_DRAFT) {
params.parameters.push_back(encodeConnIdParameter(
TransportParameterId::initial_source_connection_id,
@@ -128,7 +118,6 @@ class ServerTransportParametersExtension : public fizz::ServerExtensions {
std::chrono::milliseconds idleTimeout_;
uint64_t ackDelayExponent_;
uint64_t maxRecvPacketSize_;
TransportPartialReliabilitySetting partialReliability_;
folly::Optional<ClientTransportParameters> clientTransportParameters_;
StatelessResetToken token_;
ConnectionId initialSourceCid_;

View File

@@ -128,7 +128,6 @@ class ServerHandshakeTest : public Test {
kDefaultIdleTimeout,
kDefaultAckDelayExponent,
kDefaultUDPSendPacketLen,
kDefaultPartialReliability,
generateStatelessResetToken(),
ConnectionId(std::vector<uint8_t>{0xff, 0xfe, 0xfd, 0xfc}),
ConnectionId(std::vector<uint8_t>()));

View File

@@ -45,7 +45,6 @@ TEST(ServerTransportParametersTest, TestGetExtensions) {
kDefaultIdleTimeout,
kDefaultAckDelayExponent,
kDefaultUDPSendPacketLen,
kDefaultPartialReliability,
generateStatelessResetToken(),
ConnectionId(std::vector<uint8_t>{0xff, 0xfe, 0xfd, 0xfc}),
ConnectionId(std::vector<uint8_t>()));
@@ -68,7 +67,6 @@ TEST(ServerTransportParametersTest, TestGetExtensionsMissingClientParams) {
kDefaultIdleTimeout,
kDefaultAckDelayExponent,
kDefaultUDPSendPacketLen,
kDefaultPartialReliability,
generateStatelessResetToken(),
ConnectionId(std::vector<uint8_t>{0xff, 0xfe, 0xfd, 0xfc}),
ConnectionId(std::vector<uint8_t>()));

View File

@@ -121,9 +121,6 @@ void processClientInitialParams(
TransportParameterId::ack_delay_exponent, clientParams.parameters);
auto packetSize = getIntegerParameter(
TransportParameterId::max_packet_size, clientParams.parameters);
auto partialReliability = getIntegerParameter(
static_cast<TransportParameterId>(kPartialReliabilityParameterId),
clientParams.parameters);
auto activeConnectionIdLimit = getIntegerParameter(
TransportParameterId::active_connection_id_limit,
clientParams.parameters);
@@ -215,13 +212,6 @@ void processClientInitialParams(
conn.peerActiveConnectionIdLimit =
activeConnectionIdLimit.value_or(kDefaultActiveConnectionIdLimit);
if (partialReliability && *partialReliability != 0 &&
conn.transportSettings.partialReliabilityEnabled) {
conn.partialReliabilityEnabled = true;
}
VLOG(10) << "conn.partialReliabilityEnabled="
<< conn.partialReliabilityEnabled;
if (conn.transportSettings.d6dConfig.enabled) {
// Sanity check
if (d6dBasePMTU) {
@@ -674,7 +664,6 @@ void onServerReadDataFromOpen(
conn.transportSettings.idleTimeout,
conn.transportSettings.ackDelayExponent,
conn.transportSettings.maxRecvPacketSize,
conn.transportSettings.partialReliabilityEnabled,
*newServerConnIdData->token,
conn.serverConnectionId.value(),
initialDestinationConnectionId));

View File

@@ -23,7 +23,6 @@
#include <quic/server/handshake/ServerHandshakeFactory.h>
#include <quic/server/state/ServerConnectionIdRejector.h>
#include <quic/state/AckHandlers.h>
#include <quic/state/QPRFunctions.h>
#include <quic/state/QuicStateFunctions.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/SimpleFrameFunctions.h>

View File

@@ -207,7 +207,6 @@ target_compile_options(
add_dependencies(
mvfst_state_simple_frame_functions
mvfst_state_qpr_functions
mvfst_state_functions
mvfst_state_machine
mvfst_codec_types
@@ -216,51 +215,11 @@ add_dependencies(
target_link_libraries(
mvfst_state_simple_frame_functions PUBLIC
Folly::folly
mvfst_state_qpr_functions
mvfst_state_functions
mvfst_state_machine
mvfst_codec_types
)
# qpr function
add_library(
mvfst_state_qpr_functions
QPRFunctions.cpp
)
target_include_directories(
mvfst_state_qpr_functions PUBLIC
$<BUILD_INTERFACE:${QUIC_FBCODE_ROOT}>
$<INSTALL_INTERFACE:include/>
)
target_compile_options(
mvfst_state_qpr_functions
PRIVATE
${_QUIC_COMMON_COMPILE_OPTIONS}
)
add_dependencies(
mvfst_state_qpr_functions
mvfst_bufutil
mvfst_state_machine
mvfst_state_stream_functions
mvfst_codec_types
mvfst_flowcontrol
)
target_link_libraries(
mvfst_state_qpr_functions PUBLIC
Folly::folly
mvfst_bufutil
mvfst_state_machine
mvfst_state_stream_functions
mvfst_codec_types
mvfst_flowcontrol
)
add_library(
mvfst_state_stream STATIC
stream/StreamStateFunctions.cpp
@@ -346,12 +305,6 @@ install(
DESTINATION lib
)
install(
TARGETS mvfst_state_qpr_functions
EXPORT mvfst-exports
DESTINATION lib
)
install(
TARGETS mvfst_state_stream
EXPORT mvfst-exports

View File

@@ -1,265 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#include <quic/state/QPRFunctions.h>
#include <quic/flowcontrol/QuicFlowController.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/QuicStreamUtilities.h>
namespace quic {
namespace {
// shrink the buffers until offset, either by popping up or trimming from start
void shrinkBuffers(std::deque<StreamBuffer>& buffers, uint64_t offset) {
while (!buffers.empty()) {
auto curr = buffers.begin();
if (curr->offset >= offset) {
// the buffers are supposed to be sorted, so we are done here
break;
}
size_t currSize = curr->data.chainLength();
if (curr->offset + currSize <= offset) {
buffers.pop_front();
} else {
uint64_t amount = offset - curr->offset;
curr->data.trimStartAtMost(amount);
curr->offset += amount;
break;
}
}
}
void shrinkBuffers(
folly::F14FastMap<uint64_t, std::unique_ptr<StreamBuffer>>& buffers,
uint64_t offset) {
// Do a linear search of the entire buffer, there can be exactly one trimmed
// buffer, since we are changing the offset for that single buffer we need to
// change the offset in the StreamBuffer, but keep it keyed on the same
// offset as before so we still remove it on ack.
for (auto itr = buffers.begin(); itr != buffers.end();) {
if (itr->second->offset >= offset) {
itr++;
continue;
}
if (itr->second->offset + itr->second->data.chainLength() <= offset) {
itr = buffers.erase(itr);
} else {
uint64_t amount = offset - itr->second->offset;
itr->second->data.trimStartAtMost(amount);
itr->second->offset += amount;
itr++;
}
}
}
void shrinkRetransmittableBuffers(
QuicStreamState* stream,
uint64_t minimumRetransmittableOffset) {
if (minimumRetransmittableOffset > stream->currentWriteOffset) {
uint64_t amount = minimumRetransmittableOffset - stream->currentWriteOffset;
auto trimmed = stream->writeBuffer.trimStartAtMost(amount);
stream->currentWriteOffset = minimumRetransmittableOffset;
// pretends we sent extra bits here as we moved currentWriteOffset
updateFlowControlOnWriteToStream(*stream, amount - trimmed);
updateFlowControlOnWriteToSocket(*stream, amount);
maybeWriteBlockAfterSocketWrite(*stream);
stream->conn.streamManager->updateWritableStreams(*stream);
}
VLOG(10) << __func__ << ": shrinking retransmissionBuffer to "
<< minimumRetransmittableOffset;
shrinkBuffers(stream->retransmissionBuffer, minimumRetransmittableOffset);
shrinkBuffers(stream->lossBuffer, minimumRetransmittableOffset);
}
void shrinkReadBuffer(QuicStreamState* stream) {
if (stream->currentReceiveOffset > 0) {
uint64_t offsetSeen = stream->currentReceiveOffset - 1;
updateFlowControlOnStreamData(
*stream, stream->maxOffsetObserved, offsetSeen);
stream->maxOffsetObserved = std::max(stream->maxOffsetObserved, offsetSeen);
}
uint64_t lastReadOffset = stream->currentReadOffset;
stream->currentReadOffset = stream->currentReceiveOffset;
shrinkBuffers(stream->readBuffer, stream->currentReadOffset);
// pretends we read stream.currentReadOffset - lastReadOffset bytes
updateFlowControlOnRead(*stream, lastReadOffset, Clock::now());
// may become readable after shrink
stream->conn.streamManager->updateReadableStreams(*stream);
stream->conn.streamManager->updatePeekableStreams(*stream);
}
} // namespace
folly::Optional<uint64_t> advanceCurrentReceiveOffset(
QuicStreamState* stream,
uint64_t offset) {
if (offset <= stream->currentReceiveOffset ||
offset <= stream->currentReadOffset) {
return folly::none;
}
// do not go beyond EOF offset
if (stream->finalReadOffset) {
offset = std::min(offset, *stream->finalReadOffset);
}
stream->currentReceiveOffset = offset;
shrinkReadBuffer(stream);
// Check if we have a pending MinStreamDataFrame for this stream
auto& frames = stream->conn.pendingEvents.frames;
auto it = find_if(frames.begin(), frames.end(), [&](QuicSimpleFrame& frame) {
MinStreamDataFrame* minStreamData = frame.asMinStreamDataFrame();
if (!minStreamData) {
return false;
}
return minStreamData->streamId == stream->id;
});
MinStreamDataFrame minStreamDataFrame = generateMinStreamDataFrame(*stream);
if (it == frames.end()) {
frames.emplace_back(minStreamDataFrame);
} else {
// update existing pending MinStreamDataFrame
MinStreamDataFrame& frame = *it->asMinStreamDataFrame();
frame = minStreamDataFrame;
}
return offset;
}
void onRecvMinStreamDataFrame(
QuicStreamState* stream,
const MinStreamDataFrame& frame,
PacketNum packetNum) {
if (isReceivingStream(stream->conn.nodeType, stream->id) ||
(isSendingStream(stream->conn.nodeType, stream->id) &&
stream->sendState != StreamSendState::Open)) {
throw QuicTransportException(
"MinStreamDataFrame on receiving-only stream or "
"sending-only stream but not opened",
TransportErrorCode::PROTOCOL_VIOLATION);
}
if (frame.maximumData < frame.minimumStreamOffset) {
throw QuicTransportException(
"Invalid data",
TransportErrorCode::FRAME_ENCODING_ERROR,
FrameType::MIN_STREAM_DATA);
}
if (frame.minimumStreamOffset <= stream->minimumRetransmittableOffset) {
// nothing to do
return;
}
handleStreamWindowUpdate(*stream, frame.maximumData, packetNum);
uint64_t minimumStreamOffset = frame.minimumStreamOffset;
// do not go beyond EOF offset
if (stream->finalWriteOffset) {
minimumStreamOffset =
std::min(minimumStreamOffset, *stream->finalWriteOffset);
}
stream->minimumRetransmittableOffset = minimumStreamOffset;
shrinkRetransmittableBuffers(stream, stream->minimumRetransmittableOffset);
// remove the stale pending ExpiredStreamDataFrame if exists
auto& frames = stream->conn.pendingEvents.frames;
auto it = find_if(frames.begin(), frames.end(), [&](QuicSimpleFrame& frame) {
ExpiredStreamDataFrame* expiredFrame = frame.asExpiredStreamDataFrame();
if (!expiredFrame) {
return false;
}
return expiredFrame->minimumStreamOffset <=
stream->minimumRetransmittableOffset;
});
if (it != frames.end()) {
frames.erase(it);
}
stream->conn.streamManager->addDataRejected(stream->id);
}
folly::Optional<uint64_t> advanceMinimumRetransmittableOffset(
QuicStreamState* stream,
uint64_t minimumStreamOffset) {
if (minimumStreamOffset <= stream->minimumRetransmittableOffset) {
return folly::none;
}
// take flow control into consideration
minimumStreamOffset = std::min(
minimumStreamOffset, stream->flowControlState.peerAdvertisedMaxOffset);
// do not go beyond EOF offset
if (stream->finalWriteOffset) {
minimumStreamOffset =
std::min(minimumStreamOffset, *stream->finalWriteOffset);
}
stream->minimumRetransmittableOffset = minimumStreamOffset;
shrinkRetransmittableBuffers(stream, stream->minimumRetransmittableOffset);
auto& frames = stream->conn.pendingEvents.frames;
auto it = find_if(frames.begin(), frames.end(), [&](QuicSimpleFrame& frame) {
ExpiredStreamDataFrame* expiredFrame = frame.asExpiredStreamDataFrame();
if (!expiredFrame) {
return false;
}
return expiredFrame->streamId == stream->id;
});
if (it == frames.end()) {
frames.emplace_back(
ExpiredStreamDataFrame(stream->id, minimumStreamOffset));
} else {
// update the existing pending ExpiredStreamDataFrame
ExpiredStreamDataFrame& frame = *it->asExpiredStreamDataFrame();
frame.minimumStreamOffset = minimumStreamOffset;
}
return minimumStreamOffset;
}
void onRecvExpiredStreamDataFrame(
QuicStreamState* stream,
const ExpiredStreamDataFrame& frame) {
if (isSendingStream(stream->conn.nodeType, stream->id)) {
throw QuicTransportException(
"ExpiredStreamDataFrame on unidirectional sending stream",
TransportErrorCode::PROTOCOL_VIOLATION);
}
// ignore the frames that don't advance the offset.
// this may happen due to loss and reordering
if (frame.minimumStreamOffset <= stream->currentReceiveOffset ||
frame.minimumStreamOffset <= stream->currentReadOffset) {
return;
}
uint64_t minimumStreamOffset = frame.minimumStreamOffset;
// do not go beyond EOF offset
if (stream->finalReadOffset) {
minimumStreamOffset =
std::min(minimumStreamOffset, *stream->finalReadOffset);
}
stream->currentReceiveOffset = minimumStreamOffset;
shrinkReadBuffer(stream);
// remove the stale pending MinStreamDataFrame if exists
auto& frames = stream->conn.pendingEvents.frames;
auto it = find_if(frames.begin(), frames.end(), [&](QuicSimpleFrame& frame) {
MinStreamDataFrame* minStreamData = frame.asMinStreamDataFrame();
if (!minStreamData) {
return false;
}
return minStreamData->minimumStreamOffset <= stream->currentReceiveOffset;
});
if (it != frames.end()) {
frames.erase(it);
}
stream->conn.streamManager->addDataExpired(stream->id);
}
} // namespace quic

View File

@@ -1,44 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#pragma once
#include <quic/codec/Types.h>
#include <quic/state/StateData.h>
namespace quic {
/**
* advance the currentReceiveOffset for a stream
*/
folly::Optional<uint64_t> advanceCurrentReceiveOffset(
QuicStreamState* stream,
uint64_t offset);
/**
* processing upon receipt of MinStreamDataFrame
*/
void onRecvMinStreamDataFrame(
QuicStreamState* stream,
const MinStreamDataFrame& frame,
PacketNum packetNum);
/**
* advance the minimum retransmittable offset for a stream
*/
folly::Optional<uint64_t> advanceMinimumRetransmittableOffset(
QuicStreamState* stream,
uint64_t minimumRetransmittableOffset);
/**
* processing upon receipt of ExpiredStreamDataFrame
*/
void onRecvExpiredStreamDataFrame(
QuicStreamState* stream,
const ExpiredStreamDataFrame& frame);
} // namespace quic

View File

@@ -450,54 +450,4 @@ void processCryptoStreamAck(
}
cryptoStream.retransmissionBuffer.erase(ackedBuffer);
}
bool streamFrameMatchesRetransmitBuffer(
const QuicStreamState& stream,
const WriteStreamFrame& streamFrame,
const StreamBuffer& buf) {
// There are 3 possible situations.
// 1) Fully reliable mode: the buffer's and stream frame's offsets and lengths
// must match.
// 2) Partially reliable mode: the retransmit queue buffer has been
// fully removed by an egress skip before the stream frame arrived.
// 3) Partially reliable mode: the retransmit queue buffer was only
// partially trimmed.
// In this case, the retransmit buffer offset must be >= stream frame
// offset and retransmit buffer length must be <= stream frame len field
// vale. Also, the retransmit buffer [offset + length] must match stream
// frame [offset + length].
bool match = false;
if (stream.conn.partialReliabilityEnabled) {
auto frameRightOffset = streamFrame.offset + streamFrame.len;
if (frameRightOffset > buf.offset) {
// There is overlap, buffer fully or partially matches.
DCHECK(buf.offset >= streamFrame.offset);
DCHECK(buf.data.chainLength() <= streamFrame.len);
// The offsets and lengths in the stream frame and buffer may be
// different, but their sum should stay the same (e.g. offset grows,
// length shrinks but sum must be the same).
//
// Example: let's say we send data buf with offset=0 and len=11 and we
// save a copy in retransmission queue. Then we send egress skip to offset
// 6 and that trims that buf copy in retransmission queue to offset=6 and
// len=5. Then we get an ACK for the original buf we sent with old
// offset=0 and len=11 and comparing it to the already trimmed buf. The
// offsets and lengths are going to be different, but their sum will be
// the same.
DCHECK_EQ(
buf.offset + buf.data.chainLength(),
streamFrame.offset + streamFrame.len);
DCHECK_EQ(buf.eof, streamFrame.fin);
match = true;
} // else frameRightOffset <= buf.offset { ignore }
} else {
DCHECK_EQ(buf.offset, streamFrame.offset);
DCHECK_EQ(buf.data.chainLength(), streamFrame.len);
DCHECK_EQ(buf.eof, streamFrame.fin);
match = true;
}
return match;
}
} // namespace quic

View File

@@ -148,12 +148,4 @@ void processCryptoStreamAck(
uint64_t offset,
uint64_t len);
/**
* Checks if stream frame matches buffer from the retransmit queue.
*
*/
bool streamFrameMatchesRetransmitBuffer(
const QuicStreamState& stream,
const WriteStreamFrame& ackFrame,
const StreamBuffer& buf);
} // namespace quic

View File

@@ -447,8 +447,6 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
lossStreams_.erase(streamId);
stopSendingStreams_.erase(streamId);
flowControlUpdated_.erase(streamId);
dataRejectedStreams_.erase(streamId);
dataExpiredStreams_.erase(streamId);
if (it->second.isControl) {
DCHECK_GT(numControlStreams_, 0);
numControlStreams_--;

View File

@@ -523,49 +523,6 @@ class QuicStreamManager {
return txStreams_.count(streamId) > 0;
}
/*
* Returns a const reference to the underlying data rejected streams
* container.
*/
const auto& dataRejectedStreams() const {
return dataRejectedStreams_;
}
/*
* Add a data rejected stream.
*/
void addDataRejected(StreamId streamId) {
dataRejectedStreams_.insert(streamId);
}
/*
* Returns a const reference to the underlying data expired streams container.
*/
const auto& dataExpiredStreams() const {
return dataExpiredStreams_;
}
/*
* Clear the data rejected streams.
*/
void clearDataRejected() {
dataRejectedStreams_.clear();
}
/*
* Add a data expired stream.
*/
void addDataExpired(StreamId streamId) {
dataExpiredStreams_.insert(streamId);
}
/*
* Clear the data expired streams.
*/
void clearDataExpired() {
dataExpiredStreams_.clear();
}
// TODO figure out a better interface here.
/*
* Returns a mutable reference to the underlying readable streams container.
@@ -763,8 +720,6 @@ class QuicStreamManager {
readableStreams_.clear();
peekableStreams_.clear();
flowControlUpdated_.clear();
dataExpiredStreams_.clear();
dataRejectedStreams_.clear();
}
bool isAppIdle() const;
@@ -869,12 +824,6 @@ class QuicStreamManager {
// Map of streams where the peer was asked to stop sending
folly::F14FastMap<StreamId, ApplicationErrorCode> stopSendingStreams_;
// Set of streams that have expired data
folly::F14FastSet<StreamId> dataExpiredStreams_;
// Set of streams that have rejected data
folly::F14FastSet<StreamId> dataRejectedStreams_;
// Streams that had their stream window change and potentially need a window
// update sent
folly::F14FastSet<StreamId> windowUpdates_;

View File

@@ -28,18 +28,6 @@ folly::Optional<QuicSimpleFrame> updateSimpleFrameOnPacketClone(
return folly::none;
}
return QuicSimpleFrame(frame);
case QuicSimpleFrame::Type::MinStreamDataFrame:
if (!conn.streamManager->streamExists(
frame.asMinStreamDataFrame()->streamId)) {
return folly::none;
}
return QuicSimpleFrame(frame);
case QuicSimpleFrame::Type::ExpiredStreamDataFrame:
if (!conn.streamManager->streamExists(
frame.asExpiredStreamDataFrame()->streamId)) {
return folly::none;
}
return QuicSimpleFrame(frame);
case QuicSimpleFrame::Type::PathChallengeFrame:
// Path validation timer expired, path validation failed;
// or a different path validation was scheduled
@@ -95,24 +83,6 @@ void updateSimpleFrameOnPacketLoss(
}
break;
}
case QuicSimpleFrame::Type::MinStreamDataFrame: {
const MinStreamDataFrame& minStreamData = *frame.asMinStreamDataFrame();
auto stream = conn.streamManager->getStream(minStreamData.streamId);
if (stream && stream->conn.partialReliabilityEnabled) {
advanceCurrentReceiveOffset(stream, minStreamData.minimumStreamOffset);
}
break;
}
case QuicSimpleFrame::Type::ExpiredStreamDataFrame: {
const ExpiredStreamDataFrame& expiredFrame =
*frame.asExpiredStreamDataFrame();
auto stream = conn.streamManager->getStream(expiredFrame.streamId);
if (stream && stream->conn.partialReliabilityEnabled) {
advanceMinimumRetransmittableOffset(
stream, expiredFrame.minimumStreamOffset);
}
break;
}
case QuicSimpleFrame::Type::PathChallengeFrame: {
const PathChallengeFrame& pathChallenge = *frame.asPathChallengeFrame();
if (conn.outstandingPathValidation &&
@@ -143,7 +113,7 @@ void updateSimpleFrameOnPacketLoss(
bool updateSimpleFrameOnPacketReceived(
QuicConnectionStateBase& conn,
const QuicSimpleFrame& frame,
PacketNum packetNum,
PacketNum /*packetNum*/,
bool fromChangedPeerAddress) {
switch (frame.type()) {
case QuicSimpleFrame::Type::StopSendingFrame: {
@@ -154,23 +124,6 @@ bool updateSimpleFrameOnPacketReceived(
}
return true;
}
case QuicSimpleFrame::Type::MinStreamDataFrame: {
const MinStreamDataFrame& minStreamData = *frame.asMinStreamDataFrame();
auto stream = conn.streamManager->getStream(minStreamData.streamId);
if (stream && stream->conn.partialReliabilityEnabled) {
onRecvMinStreamDataFrame(stream, minStreamData, packetNum);
}
return true;
}
case QuicSimpleFrame::Type::ExpiredStreamDataFrame: {
const ExpiredStreamDataFrame& expiredStreamData =
*frame.asExpiredStreamDataFrame();
auto stream = conn.streamManager->getStream(expiredStreamData.streamId);
if (stream && stream->conn.partialReliabilityEnabled) {
onRecvExpiredStreamDataFrame(stream, expiredStreamData);
}
return true;
}
case QuicSimpleFrame::Type::PathChallengeFrame: {
bool rotatedId = conn.retireAndSwitchPeerConnectionIds();
if (!rotatedId) {

View File

@@ -9,7 +9,6 @@
#pragma once
#include <quic/codec/Types.h>
#include <quic/state/QPRFunctions.h>
#include <quic/state/StateData.h>
namespace quic {

View File

@@ -821,9 +821,6 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
// Whether a connection can be paced based on its handshake and close states.
// For example, we may not want to pace a connection that's still handshaking.
bool canBePaced{false};
// Whether or not both ends agree to use partial reliability
bool partialReliabilityEnabled{false};
};
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);

View File

@@ -204,8 +204,6 @@ struct TransportSettings {
// the callback registered through notifyPendingWriteOnConnection() will
// not be called
uint64_t totalBufferSpaceAvailable{kDefaultBufferSpaceAvailable};
// Whether or not to advertise partial reliability capability
bool partialReliabilityEnabled{false};
// Whether the endpoint allows peer to migrate to new address
bool disableMigration{true};
// Whether or not the socket should gracefully drain on close

View File

@@ -100,24 +100,15 @@ void sendAckSMHandler(
// Clean up the acked buffers from the retransmissionBuffer.
auto ackedBuffer = stream.retransmissionBuffer.find(ackedFrame.offset);
if (ackedBuffer != stream.retransmissionBuffer.end()) {
if (streamFrameMatchesRetransmitBuffer(
stream, ackedFrame, *ackedBuffer->second)) {
VLOG(10) << "Open: acked stream data stream=" << stream.id
<< " offset=" << ackedBuffer->second->offset
<< " len=" << ackedBuffer->second->data.chainLength()
<< " eof=" << ackedBuffer->second->eof << " " << stream.conn;
stream.ackedIntervals.insert(
ackedBuffer->second->offset,
ackedBuffer->second->offset +
ackedBuffer->second->data.chainLength());
stream.retransmissionBuffer.erase(ackedBuffer);
} else {
VLOG(10)
<< "Open: received an ack for already discarded buffer; stream="
<< stream.id << " offset=" << ackedBuffer->second->offset
<< " len=" << ackedBuffer->second->data.chainLength()
<< " eof=" << ackedBuffer->second->eof << " " << stream.conn;
}
VLOG(10) << "Open: acked stream data stream=" << stream.id
<< " offset=" << ackedBuffer->second->offset
<< " len=" << ackedBuffer->second->data.chainLength()
<< " eof=" << ackedBuffer->second->eof << " " << stream.conn;
stream.ackedIntervals.insert(
ackedBuffer->second->offset,
ackedBuffer->second->offset +
ackedBuffer->second->data.chainLength());
stream.retransmissionBuffer.erase(ackedBuffer);
}
// This stream may be able to invoke some deliveryCallbacks:

View File

@@ -284,155 +284,6 @@ TEST_F(QuicOpenStateTest, RetxBufferSortedAfterAck) {
EXPECT_EQ(2, stream->retransmissionBuffer.size());
}
TEST_F(QuicOpenStateTest, AckStreamAfterSkip) {
auto conn = createConn();
conn->partialReliabilityEnabled = true;
auto stream = conn->streamManager->createNextBidirectionalStream().value();
folly::Optional<ConnectionId> serverChosenConnId = *conn->clientConnectionId;
serverChosenConnId.value().data()[0] ^= 0x01;
EventBase evb;
auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
auto buf = IOBuf::copyBuffer("hello");
writeQuicPacket(
*conn,
*conn->clientConnectionId,
*serverChosenConnId,
*sock,
*stream,
*buf,
true);
EXPECT_EQ(stream->retransmissionBuffer.size(), 1);
EXPECT_EQ(1, conn->outstandings.packets.size());
auto& streamFrame =
*getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData)
->packet.frames.front()
.asWriteStreamFrame();
PacketNum packetNum(1);
MinStreamDataFrame minDataFrame(stream->id, 1000, 100);
onRecvMinStreamDataFrame(stream, minDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, buf->length());
EXPECT_TRUE(stream->retransmissionBuffer.empty());
sendAckSMHandler(*stream, streamFrame);
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
ASSERT_EQ(stream->recvState, StreamRecvState::Open);
sendAckSMHandler(*stream, streamFrame);
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
ASSERT_EQ(stream->recvState, StreamRecvState::Open);
}
TEST_F(QuicOpenStateTest, AckStreamAfterSkipHalfBuf) {
auto conn = createConn();
conn->partialReliabilityEnabled = true;
auto stream = conn->streamManager->createNextBidirectionalStream().value();
folly::Optional<ConnectionId> serverChosenConnId = *conn->clientConnectionId;
serverChosenConnId.value().data()[0] ^= 0x01;
EventBase evb;
auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
auto buf = IOBuf::copyBuffer("hello");
writeQuicPacket(
*conn,
*conn->clientConnectionId,
*serverChosenConnId,
*sock,
*stream,
*buf,
true);
EXPECT_EQ(stream->retransmissionBuffer.size(), 1);
EXPECT_EQ(1, conn->outstandings.packets.size());
auto& streamFrame =
*getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData)
->packet.frames.front()
.asWriteStreamFrame();
PacketNum packetNum(1);
// Skip ~0.5 buffers.
MinStreamDataFrame minDataFrame(stream->id, 1000, 3);
onRecvMinStreamDataFrame(stream, minDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, 3);
EXPECT_EQ(stream->retransmissionBuffer.size(), 1);
sendAckSMHandler(*stream, streamFrame);
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
ASSERT_EQ(stream->recvState, StreamRecvState::Open);
}
TEST_F(QuicOpenStateTest, AckStreamAfterSkipOneAndAHalfBuf) {
auto conn = createConn();
conn->partialReliabilityEnabled = true;
auto stream = conn->streamManager->createNextBidirectionalStream().value();
folly::Optional<ConnectionId> serverChosenConnId = *conn->clientConnectionId;
serverChosenConnId.value().data()[0] ^= 0x01;
EventBase evb;
auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
// Write two buffers.
auto buf = IOBuf::copyBuffer("hello");
writeQuicPacket(
*conn,
*conn->clientConnectionId,
*serverChosenConnId,
*sock,
*stream,
*buf,
false);
auto buf2 = IOBuf::copyBuffer("hello again");
writeQuicPacket(
*conn,
*conn->clientConnectionId,
*serverChosenConnId,
*sock,
*stream,
*buf,
true);
EXPECT_EQ(stream->retransmissionBuffer.size(), 2);
EXPECT_EQ(2, conn->outstandings.packets.size());
auto streamFrameIt =
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData);
auto& streamFrame1 =
*streamFrameIt->packet.frames.front().asWriteStreamFrame();
auto& streamFrame2 = *getNextOutstandingPacket(
*conn, PacketNumberSpace::AppData, ++streamFrameIt)
->packet.frames.front()
.asWriteStreamFrame();
PacketNum packetNum(1);
// Skip ~1.5 buffers.
MinStreamDataFrame minDataFrame(stream->id, 1000, 7);
onRecvMinStreamDataFrame(stream, minDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, 7);
EXPECT_EQ(stream->retransmissionBuffer.size(), 1);
// Send ack for the first buffer, should be ignored since that buffer was
// discarded after the skip.
sendAckSMHandler(*stream, streamFrame1);
EXPECT_EQ(stream->retransmissionBuffer.size(), 1);
ASSERT_EQ(stream->sendState, StreamSendState::Open);
ASSERT_EQ(stream->recvState, StreamRecvState::Open);
// Send ack for the second buffer, should clear out the retransmit queue after
// correctly identifying adjusted offset.
sendAckSMHandler(*stream, streamFrame2);
EXPECT_TRUE(stream->retransmissionBuffer.empty());
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
ASSERT_EQ(stream->recvState, StreamRecvState::Open);
}
class QuicResetSentStateTest : public Test {};
TEST_F(QuicResetSentStateTest, RstAck) {
@@ -559,53 +410,6 @@ TEST_F(QuicHalfClosedRemoteStateTest, AckStream) {
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
}
TEST_F(QuicHalfClosedRemoteStateTest, AckStreamAfterSkip) {
auto conn = createConn();
// create server chosen connId with processId = 0 and workerId = 0
ServerConnectionIdParams params(0, 0, 0);
auto connIdAlgo = std::make_unique<DefaultConnectionIdAlgo>();
auto serverChosenConnId = connIdAlgo->encodeConnectionId(params);
auto stream = conn->streamManager->createNextBidirectionalStream().value();
stream->sendState = StreamSendState::Open;
stream->recvState = StreamRecvState::Closed;
EventBase evb;
auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
auto buf = IOBuf::copyBuffer("hello");
writeQuicPacket(
*conn,
*conn->clientConnectionId,
*serverChosenConnId,
*sock,
*stream,
*buf,
true);
EXPECT_EQ(stream->retransmissionBuffer.size(), 1);
EXPECT_EQ(1, conn->outstandings.packets.size());
auto& streamFrame =
*getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData)
->packet.frames.front()
.asWriteStreamFrame();
PacketNum packetNum(1);
MinStreamDataFrame minDataFrame(stream->id, 1000, 100);
onRecvMinStreamDataFrame(stream, minDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, buf->length());
EXPECT_TRUE(stream->retransmissionBuffer.empty());
sendAckSMHandler(*stream, streamFrame);
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
ASSERT_EQ(stream->recvState, StreamRecvState::Closed);
sendAckSMHandler(*stream, streamFrame);
ASSERT_EQ(stream->sendState, StreamSendState::Closed);
ASSERT_EQ(stream->recvState, StreamRecvState::Closed);
}
class QuicSendResetTest : public Test {};
TEST_F(QuicSendResetTest, FromOpen) {

View File

@@ -62,11 +62,3 @@ quic_add_test(TARGET QuicPacingFunctionsTest
DEPENDS
mvfst_state_pacing_functions
)
quic_add_test(TARGET QPRFunctionsTest
SOURCES
QPRFunctionsTest.cpp
DEPENDS
mvfst_server
mvfst_state_qpr_functions
)

View File

@@ -1,277 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
#include <quic/server/state/ServerStateMachine.h>
#include <quic/state/QPRFunctions.h>
using namespace folly;
using namespace testing;
namespace quic {
namespace test {
class QPRFunctionsTest : public Test {
public:
QPRFunctionsTest()
: conn(FizzServerQuicHandshakeContext::Builder().build()) {}
void SetUp() override {
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiLocal =
kDefaultStreamWindowSize;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote =
kDefaultStreamWindowSize;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetUni =
kDefaultStreamWindowSize;
conn.flowControlState.peerAdvertisedMaxOffset =
kDefaultConnectionWindowSize;
conn.streamManager->setMaxLocalBidirectionalStreams(
kDefaultMaxStreamsBidirectional);
conn.streamManager->setMaxLocalUnidirectionalStreams(
kDefaultMaxStreamsUnidirectional);
conn.partialReliabilityEnabled = true;
}
QuicServerConnectionState conn;
};
TEST_F(QPRFunctionsTest, RecvExpiredStreamDataFrame) {
// case1. sending only stream
auto sendingOnlyStream =
conn.streamManager->createNextUnidirectionalStream().value();
ExpiredStreamDataFrame expiredStreamDataFrame(sendingOnlyStream->id, 10);
EXPECT_THROW(
onRecvExpiredStreamDataFrame(sendingOnlyStream, expiredStreamDataFrame),
QuicTransportException);
auto stream = conn.streamManager->createNextBidirectionalStream().value();
stream->currentReceiveOffset = 100;
expiredStreamDataFrame.streamId = stream->id;
// case2. loss reordering
expiredStreamDataFrame.minimumStreamOffset = 10;
onRecvExpiredStreamDataFrame(stream, expiredStreamDataFrame);
EXPECT_EQ(stream->currentReceiveOffset, 100);
// case3. normal case
stream->currentReadOffset = 100;
stream->conn.flowControlState.sumCurReadOffset = 100;
auto buf1 = IOBuf::copyBuffer("XXXXXXXXXX"); // 140-149
StreamBuffer buffer{buf1->clone(), 140, false};
stream->readBuffer.emplace_back(std::move(buffer));
expiredStreamDataFrame.minimumStreamOffset = 145;
onRecvExpiredStreamDataFrame(stream, expiredStreamDataFrame);
EXPECT_EQ(stream->currentReceiveOffset, 145);
EXPECT_EQ(stream->currentReadOffset, 145);
EXPECT_FALSE(stream->readBuffer.empty());
EXPECT_EQ(stream->readBuffer.front().offset, 145);
EXPECT_EQ(stream->readBuffer.front().data.chainLength(), 5);
EXPECT_EQ(stream->conn.flowControlState.sumCurReadOffset, 145);
}
TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) {
auto stream = conn.streamManager->createNextBidirectionalStream().value();
// case 0. currentWriteOffset = 0, must be moved to 4.
stream->currentWriteOffset = 0;
auto result = advanceMinimumRetransmittableOffset(stream, 4);
EXPECT_EQ(stream->currentWriteOffset, 4);
EXPECT_TRUE(result.has_value());
EXPECT_EQ(stream->minimumRetransmittableOffset, 4);
// case1. minimumRetransmittableOffset to set is too small
stream->minimumRetransmittableOffset = 10;
result = advanceMinimumRetransmittableOffset(stream, 1);
EXPECT_FALSE(result.has_value());
EXPECT_EQ(stream->minimumRetransmittableOffset, 10);
auto buf = folly::IOBuf::copyBuffer("aaaaaaaaaa");
// case2. has no unacked data below 139
stream->currentWriteOffset = 150;
stream->retransmissionBuffer.emplace(
140, std::make_unique<StreamBuffer>(buf->clone(), 140));
result = advanceMinimumRetransmittableOffset(stream, 139);
EXPECT_TRUE(result.has_value());
EXPECT_EQ(*result, 139);
EXPECT_EQ(stream->minimumRetransmittableOffset, 139);
EXPECT_EQ(stream->conn.pendingEvents.frames.size(), 1);
// case3. ExpiredStreamDataFrame is wired
stream->minimumRetransmittableOffset = 139;
stream->retransmissionBuffer.emplace(
140, std::make_unique<StreamBuffer>(buf->clone(), 140));
result = advanceMinimumRetransmittableOffset(stream, 150);
EXPECT_TRUE(result.has_value());
EXPECT_EQ(*result, 150);
EXPECT_EQ(stream->conn.pendingEvents.frames.size(), 1);
{
ExpiredStreamDataFrame* expiredFrame =
stream->conn.pendingEvents.frames[0].asExpiredStreamDataFrame();
if (expiredFrame) {
EXPECT_EQ(expiredFrame->minimumStreamOffset, 150);
}
}
EXPECT_TRUE(stream->retransmissionBuffer.empty());
// case4. update existing pending event.
stream->minimumRetransmittableOffset = 150;
stream->retransmissionBuffer.emplace(
150, std::make_unique<StreamBuffer>(buf->clone(), 150));
stream->conn.pendingEvents.frames.clear();
stream->conn.pendingEvents.frames.emplace_back(
ExpiredStreamDataFrame(stream->id, 160));
result = advanceMinimumRetransmittableOffset(stream, 200);
EXPECT_TRUE(result.has_value());
EXPECT_EQ(*result, 200);
EXPECT_EQ(stream->conn.pendingEvents.frames.size(), 1);
{
ExpiredStreamDataFrame* expiredFrame =
stream->conn.pendingEvents.frames[0].asExpiredStreamDataFrame();
if (expiredFrame) {
EXPECT_EQ(expiredFrame->minimumStreamOffset, 200);
}
}
}
TEST_F(QPRFunctionsTest, RecvMinStreamDataFrame) {
auto stream = conn.streamManager->createNextBidirectionalStream().value();
// case1. invalid frame data
MinStreamDataFrame maximumDataLessThanMinimumStreamOffset(stream->id, 5, 10);
PacketNum packetNum(10);
EXPECT_THROW(
onRecvMinStreamDataFrame(
stream, maximumDataLessThanMinimumStreamOffset, packetNum),
QuicTransportException);
// case2. offset is less than currentMinimumRetransmittableOffset
MinStreamDataFrame offsetLessThanMinimumRetransmittableOffset(
stream->id, 1000, 100);
stream->minimumRetransmittableOffset = 1000;
onRecvMinStreamDataFrame(
stream, offsetLessThanMinimumRetransmittableOffset, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, 1000);
// case3. normal case
stream->minimumRetransmittableOffset = 100;
MinStreamDataFrame okMinStreamDataFrame(
stream->id, stream->flowControlState.peerAdvertisedMaxOffset, 200);
onRecvMinStreamDataFrame(stream, okMinStreamDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, 200);
}
TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameShrinkBuffer) {
// case 1. where we have enough bytes in writeBuffer to shrink
auto stream = conn.streamManager->createNextBidirectionalStream().value();
PacketNum packetNum(10);
stream->minimumRetransmittableOffset = 100;
stream->currentWriteOffset = 100;
auto buf = folly::IOBuf::copyBuffer("aaaaaaaaaabbbbbbbbbb");
stream->writeBuffer.append(std::move(buf));
stream->conn.flowControlState.sumCurStreamBufferLen = 20;
auto writtenBuffer = folly::IOBuf::copyBuffer("cccccccccc");
stream->retransmissionBuffer.emplace(
90, std::make_unique<StreamBuffer>(std::move(writtenBuffer), 90, false));
MinStreamDataFrame shrinkMinStreamDataFrame(
stream->id, stream->flowControlState.peerAdvertisedMaxOffset, 110);
onRecvMinStreamDataFrame(stream, shrinkMinStreamDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, 110);
EXPECT_EQ(stream->currentWriteOffset, 110);
EXPECT_FALSE(stream->writeBuffer.empty());
EXPECT_EQ(stream->writeBuffer.chainLength(), 10);
EXPECT_TRUE(stream->retransmissionBuffer.empty());
EXPECT_EQ(stream->conn.flowControlState.sumCurStreamBufferLen, 10);
// case 2. where we skip beyond what we have in writeBuffer
stream->minimumRetransmittableOffset = 100;
stream->currentWriteOffset = 100;
stream->writeBuffer.move();
stream->conn.flowControlState.sumCurStreamBufferLen = 0;
onRecvMinStreamDataFrame(stream, shrinkMinStreamDataFrame, packetNum);
EXPECT_EQ(stream->minimumRetransmittableOffset, 110);
EXPECT_EQ(stream->currentWriteOffset, 110);
EXPECT_EQ(stream->conn.flowControlState.sumCurStreamBufferLen, 0);
}
TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameOnUnidirectionalStream) {
auto stream = conn.streamManager->createNextUnidirectionalStream().value();
stream->sendState = StreamSendState::Closed;
stream->recvState = StreamRecvState::Closed;
PacketNum packetNum(10);
MinStreamDataFrame frame(
stream->id, stream->flowControlState.peerAdvertisedMaxOffset + 100, 100);
EXPECT_THROW(
onRecvMinStreamDataFrame(stream, frame, packetNum),
QuicTransportException);
}
TEST_F(QPRFunctionsTest, AdvanceCurrentReceiveOffset) {
auto stream = conn.streamManager->createNextBidirectionalStream().value();
// case1. nothing happend
stream->currentReadOffset = 10;
stream->currentReceiveOffset = 10;
auto result = advanceCurrentReceiveOffset(stream, 1);
EXPECT_EQ(stream->currentReceiveOffset, 10);
EXPECT_FALSE(result.has_value());
// case2. MinStreamDataFrame is put on the wire
stream->currentReadOffset = 10;
stream->currentReceiveOffset = 10;
result = advanceCurrentReceiveOffset(stream, 100);
EXPECT_EQ(stream->conn.pendingEvents.frames.size(), 1);
{
MinStreamDataFrame* minStreamDataFrame =
stream->conn.pendingEvents.frames[0].asMinStreamDataFrame();
if (minStreamDataFrame) {
EXPECT_EQ(minStreamDataFrame->minimumStreamOffset, 100);
}
}
EXPECT_TRUE(result.has_value());
EXPECT_EQ(*result, 100);
// case3. update existing pending event
stream->currentReadOffset = 100;
stream->currentReceiveOffset = 100;
stream->conn.pendingEvents.frames.clear();
stream->conn.pendingEvents.frames.emplace_back(
MinStreamDataFrame(stream->id, 100, 120));
result = advanceCurrentReceiveOffset(stream, 150);
EXPECT_EQ(stream->conn.pendingEvents.frames.size(), 1);
{
MinStreamDataFrame* minStreamDataFrame =
stream->conn.pendingEvents.frames[0].asMinStreamDataFrame();
if (minStreamDataFrame) {
EXPECT_EQ(minStreamDataFrame->minimumStreamOffset, 150);
}
}
EXPECT_TRUE(result.has_value());
EXPECT_EQ(*result, 150);
// case4. where offset was adjusted
stream->currentReadOffset = 100;
stream->currentReceiveOffset = 100;
stream->finalReadOffset = folly::make_optional((uint64_t)120);
result = advanceCurrentReceiveOffset(stream, 150);
EXPECT_EQ(stream->conn.pendingEvents.frames.size(), 1);
{
MinStreamDataFrame* minStreamDataFrame =
stream->conn.pendingEvents.frames[0].asMinStreamDataFrame();
if (minStreamDataFrame) {
EXPECT_EQ(minStreamDataFrame->minimumStreamOffset, 120);
}
}
EXPECT_TRUE(result.has_value());
EXPECT_EQ(*result, 120);
}
} // namespace test
} // namespace quic

View File

@@ -1544,8 +1544,6 @@ TEST_F(QuicStreamFunctionsTest, RemovedClosedState) {
conn.streamManager->addStopSending(
streamId, GenericApplicationErrorCode::UNKNOWN);
conn.streamManager->queueFlowControlUpdated(streamId);
conn.streamManager->addDataRejected(streamId);
conn.streamManager->addDataExpired(streamId);
stream->sendState = StreamSendState::Closed;
stream->recvState = StreamRecvState::Closed;
conn.streamManager->removeClosedStream(streamId);
@@ -1559,8 +1557,6 @@ TEST_F(QuicStreamFunctionsTest, RemovedClosedState) {
EXPECT_FALSE(conn.streamManager->pendingWindowUpdate(streamId));
EXPECT_TRUE(conn.streamManager->stopSendingStreams().empty());
EXPECT_FALSE(conn.streamManager->flowControlUpdatedContains(streamId));
EXPECT_TRUE(conn.streamManager->dataRejectedStreams().empty());
EXPECT_TRUE(conn.streamManager->dataExpiredStreams().empty());
}
TEST_F(QuicServerStreamFunctionsTest, ServerGetClientQuicStream) {
@@ -2020,73 +2016,5 @@ TEST_F(QuicStreamFunctionsTest, AckCryptoStreamOffsetLengthMismatch) {
EXPECT_EQ(cryptoStream.retransmissionBuffer.size(), 1);
}
TEST_F(
QuicStreamFunctionsTest,
StreamFrameMatchesRetransmitBufferFullyReliable) {
conn.partialReliabilityEnabled = false;
StreamId id = 4;
QuicStreamState stream(id, conn);
auto data = IOBuf::copyBuffer("Hello");
auto buf = StreamBuffer(data->clone(), 0, true);
WriteStreamFrame ackFrame(
id /* streamId */,
0 /* offset */,
data->length() /* length */,
true /* eof */);
EXPECT_TRUE(streamFrameMatchesRetransmitBuffer(stream, ackFrame, buf));
}
TEST_F(
QuicStreamFunctionsTest,
StreamFrameMatchesRetransmitBufferPartiallyReliableNoSkip) {
conn.partialReliabilityEnabled = true;
StreamId id = 4;
QuicStreamState stream(id, conn);
auto data = IOBuf::copyBuffer("Hello");
auto buf = StreamBuffer(data->clone(), 0, true);
WriteStreamFrame ackFrame(
id /* streamId */,
0 /* offset */,
data->length() /* length */,
true /* eof */);
EXPECT_TRUE(streamFrameMatchesRetransmitBuffer(stream, ackFrame, buf));
}
TEST_F(
QuicStreamFunctionsTest,
StreamFrameMatchesRetransmitBufferPartiallyReliableFullBufSkipped) {
conn.partialReliabilityEnabled = true;
StreamId id = 4;
QuicStreamState stream(id, conn);
auto data = IOBuf::copyBuffer("Hello");
auto buf = StreamBuffer(data->clone(), 42, true);
WriteStreamFrame ackFrame(
id /* streamId */,
0 /* offset */,
data->length() /* length */,
true /* eof */);
EXPECT_FALSE(streamFrameMatchesRetransmitBuffer(stream, ackFrame, buf));
}
TEST_F(
QuicStreamFunctionsTest,
StreamFrameMatchesRetransmitBufferPartiallyReliableHalfBufSkipped) {
conn.partialReliabilityEnabled = true;
StreamId id = 4;
QuicStreamState stream(id, conn);
auto data = IOBuf::copyBuffer("llo");
auto buf = StreamBuffer(data->clone(), 2, true);
WriteStreamFrame ackFrame(
id /* streamId */, 0 /* offset */, 5 /* length */, true /* eof */);
EXPECT_TRUE(streamFrameMatchesRetransmitBuffer(stream, ackFrame, buf));
}
} // namespace test
} // namespace quic

View File

@@ -305,21 +305,15 @@ TEST_F(QuicStreamManagerTest, TestClearActionable) {
stream->readBuffer.emplace_back(folly::IOBuf::copyBuffer("blah blah"), 0);
manager.queueFlowControlUpdated(id);
manager.addDeliverable(id);
manager.addDataRejected(id);
manager.addDataExpired(id);
manager.updateReadableStreams(*stream);
manager.updatePeekableStreams(*stream);
EXPECT_TRUE(manager.flowControlUpdatedContains(id));
EXPECT_TRUE(manager.deliverableContains(id));
EXPECT_FALSE(manager.dataRejectedStreams().empty());
EXPECT_FALSE(manager.dataExpiredStreams().empty());
EXPECT_FALSE(manager.readableStreams().empty());
EXPECT_FALSE(manager.peekableStreams().empty());
manager.clearActionable();
EXPECT_FALSE(manager.flowControlUpdatedContains(id));
EXPECT_FALSE(manager.deliverableContains(id));
EXPECT_TRUE(manager.dataRejectedStreams().empty());
EXPECT_TRUE(manager.dataExpiredStreams().empty());
EXPECT_TRUE(manager.readableStreams().empty());
EXPECT_TRUE(manager.peekableStreams().empty());
}