1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Cancel delivery callbacks on skip

Summary:
For the sender, upon egress or ingress skip we must check registered
delivery callbacks and invoke `onCancel()` for each offset below the skipped
offset.

Reviewed By: mjoras

Differential Revision: D15113113

fbshipit-source-id: 221badce8e92abde48e8ab2c8f3c1da9e506f54e
This commit is contained in:
Konstantin Tsoy
2019-05-06 10:20:18 -07:00
committed by Facebook Github Bot
parent bea47aa54e
commit 6b49f4e42a
5 changed files with 220 additions and 0 deletions

View File

@@ -404,6 +404,14 @@ class QuicSocket {
*/ */
virtual void cancelDeliveryCallbacksForStream(StreamId streamId) = 0; virtual void cancelDeliveryCallbacksForStream(StreamId streamId) = 0;
/**
* Invoke onCanceled on all the delivery callbacks registered for streamId for
* offsets lower than the offset provided.
*/
virtual void cancelDeliveryCallbacksForStream(
StreamId streamId,
uint64_t offset) = 0;
/** /**
* Pause/Resume read callback being triggered when data is available. * Pause/Resume read callback being triggered when data is available.
*/ */

View File

@@ -895,6 +895,12 @@ QuicTransportBase::sendDataExpired(StreamId id, uint64_t offset) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
} }
auto newOffset = advanceMinimumRetransmittableOffset(stream, offset); auto newOffset = advanceMinimumRetransmittableOffset(stream, offset);
// Invoke any delivery callbacks that are set for any offset below newOffset.
if (newOffset) {
cancelDeliveryCallbacksForStream(id, *newOffset);
}
updateWriteLooper(true); updateWriteLooper(true);
return folly::makeExpected<LocalErrorCode>(newOffset); return folly::makeExpected<LocalErrorCode>(newOffset);
} }
@@ -954,6 +960,14 @@ void QuicTransportBase::invokeDataRejectedCallbacks() {
auto dataRejectedCb = callbackData->second.dataRejectedCb; auto dataRejectedCb = callbackData->second.dataRejectedCb;
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
// Invoke any delivery callbacks that are set for any offset below newly set
// minimumRetransmittableOffset.
if (!stream->streamReadError) {
cancelDeliveryCallbacksForStream(
streamId, stream->minimumRetransmittableOffset);
}
if (dataRejectedCb && !stream->streamReadError) { if (dataRejectedCb && !stream->streamReadError) {
VLOG(10) << "invoking data rejected callback on stream=" << streamId VLOG(10) << "invoking data rejected callback on stream=" << streamId
<< " " << *this; << " " << *this;
@@ -1050,6 +1064,46 @@ void QuicTransportBase::cancelDeliveryCallbacksForStream(StreamId streamId) {
deliveryCallbacks_.erase(deliveryCallbackIter); deliveryCallbacks_.erase(deliveryCallbackIter);
} }
void QuicTransportBase::cancelDeliveryCallbacksForStream(
StreamId streamId,
uint64_t offset) {
if (isReceivingStream(conn_->nodeType, streamId)) {
return;
}
auto deliveryCallbackIter = deliveryCallbacks_.find(streamId);
if (deliveryCallbackIter == deliveryCallbacks_.end()) {
conn_->streamManager->removeDeliverable(streamId);
return;
}
// Callbacks are kept sorted by offset, so we can just walk the queue and
// invoke those with offset below provided offset.
while (!deliveryCallbackIter->second.empty()) {
auto deliveryCallback = deliveryCallbackIter->second.front();
auto& cbOffset = deliveryCallback.first;
if (cbOffset < offset) {
deliveryCallbackIter->second.pop_front();
deliveryCallback.second->onCanceled(streamId, cbOffset);
if (closeState_ != CloseState::OPEN) {
// socket got closed - we can't use deliveryCallbackIter anymore,
// closeImpl should take care of delivering callbacks that are left in
// deliveryCallbackIter->second
return;
}
} else {
// Only larger or equal offsets left, exit the loop.
break;
}
}
// Clean up state for this stream if no callbacks left to invoke.
if (deliveryCallbackIter->second.empty()) {
conn_->streamManager->removeDeliverable(streamId);
deliveryCallbacks_.erase(deliveryCallbackIter);
}
}
folly::Expected<std::pair<Buf, bool>, LocalErrorCode> QuicTransportBase::read( folly::Expected<std::pair<Buf, bool>, LocalErrorCode> QuicTransportBase::read(
StreamId id, StreamId id,
size_t maxLen) { size_t maxLen) {

View File

@@ -312,6 +312,13 @@ class QuicTransportBase : public QuicSocket {
*/ */
void cancelDeliveryCallbacksForStream(StreamId streamId) override; void cancelDeliveryCallbacksForStream(StreamId streamId) override;
/**
* Invoke onCanceled on all the delivery callbacks registered for streamId for
* offsets lower than the offset provided.
*/
void cancelDeliveryCallbacksForStream(StreamId streamId, uint64_t offset)
override;
// Timeout functions // Timeout functions
class LossTimeout : public folly::HHWheelTimer::Callback { class LossTimeout : public folly::HHWheelTimer::Callback {
public: public:

View File

@@ -68,6 +68,9 @@ class MockQuicSocket : public QuicSocket {
MOCK_METHOD0(unsetAllPeekCallbacks, void()); MOCK_METHOD0(unsetAllPeekCallbacks, void());
MOCK_METHOD0(unsetAllDeliveryCallbacks, void()); MOCK_METHOD0(unsetAllDeliveryCallbacks, void());
MOCK_METHOD1(cancelDeliveryCallbacksForStream, void(StreamId)); MOCK_METHOD1(cancelDeliveryCallbacksForStream, void(StreamId));
MOCK_METHOD2(
cancelDeliveryCallbacksForStream,
void(StreamId, uint64_t offset));
MOCK_METHOD1( MOCK_METHOD1(
setConnectionFlowControlWindow, setConnectionFlowControlWindow,
folly::Expected<folly::Unit, LocalErrorCode>(uint64_t)); folly::Expected<folly::Unit, LocalErrorCode>(uint64_t));

View File

@@ -1097,6 +1097,62 @@ TEST_F(QuicTransportImplTest, DeliveryCallbackUnsetOne) {
transport->close(folly::none); transport->close(folly::none);
} }
TEST_F(QuicTransportImplTest, DeliveryCallbackOnSendDataExpire) {
InSequence enforceOrder;
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
MockDeliveryCallback dcb1;
MockDeliveryCallback dcb2;
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
auto res = transport->sendDataExpired(stream1, 11);
EXPECT_EQ(res.hasError(), false);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _));
transport->close(folly::none);
}
TEST_F(QuicTransportImplTest, DeliveryCallbackOnSendDataExpireCallbacksLeft) {
InSequence enforceOrder;
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
MockDeliveryCallback dcb1;
MockDeliveryCallback dcb2;
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream1, 20, &dcb1);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
auto res = transport->sendDataExpired(stream1, 11);
EXPECT_EQ(res.hasError(), false);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_CALL(dcb2, onCanceled(_, _));
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(1);
transport->close(folly::none);
}
TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackLowerThanExpected) { TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackLowerThanExpected) {
auto stream = transport->createBidirectionalStream().value(); auto stream = transport->createBidirectionalStream().value();
MockDeliveryCallback dcb1; MockDeliveryCallback dcb1;
@@ -2166,6 +2222,98 @@ TEST_F(QuicTransportImplTest, DataRejecteddCallbackDataAvailable) {
transport.reset(); transport.reset();
} }
TEST_F(QuicTransportImplTest, DataRejecteddCallbackWithDeliveryCallbacks) {
InSequence enforceOrder;
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
MockDeliveryCallback dcb1;
MockDeliveryCallback dcb2;
MockDataRejectedCallback dataRejectedCb1;
MockDataRejectedCallback dataRejectedCb2;
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
transport->setDataRejectedCallback(stream1, &dataRejectedCb1);
transport->setDataRejectedCallback(stream2, &dataRejectedCb2);
EXPECT_CALL(dcb1, onCanceled(stream1, 10)).Times(1);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 15));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 15));
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb1);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(stream2, 20)).Times(1);
EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 23));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 23));
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb2);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
transport->close(folly::none);
}
TEST_F(
QuicTransportImplTest,
DataRejecteddCallbackWithDeliveryCallbacksSomeLeft) {
InSequence enforceOrder;
transport->transportConn->partialReliabilityEnabled = true;
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
MockDeliveryCallback dcb1;
MockDeliveryCallback dcb2;
MockDataRejectedCallback dataRejectedCb1;
MockDataRejectedCallback dataRejectedCb2;
transport->registerDeliveryCallback(stream1, 10, &dcb1);
transport->registerDeliveryCallback(stream1, 25, &dcb1);
transport->registerDeliveryCallback(stream2, 20, &dcb2);
transport->registerDeliveryCallback(stream2, 29, &dcb2);
transport->setDataRejectedCallback(stream1, &dataRejectedCb1);
transport->setDataRejectedCallback(stream2, &dataRejectedCb2);
EXPECT_CALL(dcb1, onCanceled(stream1, 10)).Times(1);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
EXPECT_CALL(dataRejectedCb1, onDataRejected(stream1, 15));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream1, kDefaultStreamWindowSize, 15));
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb1);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(stream2, 20)).Times(1);
EXPECT_CALL(dataRejectedCb2, onDataRejected(stream2, 23));
transport->addMinStreamDataFrameToStream(
MinStreamDataFrame(stream2, kDefaultStreamWindowSize, 23));
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dataRejectedCb2);
EXPECT_CALL(dcb2, onCanceled(stream2, 29)).Times(1);
EXPECT_CALL(dcb1, onCanceled(stream1, 25)).Times(1);
transport->close(folly::none);
}
TEST_F(QuicTransportImplTest, DataRejectedCallbackChangeCallback) { TEST_F(QuicTransportImplTest, DataRejectedCallbackChangeCallback) {
transport->transportConn->partialReliabilityEnabled = true; transport->transportConn->partialReliabilityEnabled = true;