1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Don't use currentWriteOffset as highest ack offset

Summary: Don't use currentWriteOffset as highest ack offset

Reviewed By: yangchi

Differential Revision: D21679541

fbshipit-source-id: de8814aef4959abc2e10402c5d5e294ef03f8b19
This commit is contained in:
Konstantin Tsoy
2020-05-28 12:44:32 -07:00
committed by Facebook GitHub Bot
parent 1f38f6b252
commit b1cb1d32af
6 changed files with 36 additions and 37 deletions

View File

@@ -1474,16 +1474,16 @@ void QuicTransportBase::processCallbacksAfterNetworkData() {
while (deliverableStreamId.has_value()) { while (deliverableStreamId.has_value()) {
auto streamId = *deliverableStreamId; auto streamId = *deliverableStreamId;
auto stream = conn_->streamManager->getStream(streamId); auto stream = conn_->streamManager->getStream(streamId);
auto minOffsetToDeliver = getStreamNextOffsetToDeliver(*stream); auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream);
while (true) { while (maxOffsetToDeliver.has_value()) {
auto deliveryCallbacksForAckedStream = deliveryCallbacks_.find(streamId); auto deliveryCallbacksForAckedStream = deliveryCallbacks_.find(streamId);
if (deliveryCallbacksForAckedStream == deliveryCallbacks_.end() || if (deliveryCallbacksForAckedStream == deliveryCallbacks_.end() ||
deliveryCallbacksForAckedStream->second.empty()) { deliveryCallbacksForAckedStream->second.empty()) {
break; break;
} }
if (deliveryCallbacksForAckedStream->second.front().first > if (deliveryCallbacksForAckedStream->second.front().first >
minOffsetToDeliver) { *maxOffsetToDeliver) {
break; break;
} }
auto deliveryCallbackAndOffset = auto deliveryCallbackAndOffset =
@@ -1936,8 +1936,8 @@ QuicTransportBase::registerDeliveryCallback(
deliveryCallbackIt->second.emplace(pos, offset, cb); deliveryCallbackIt->second.emplace(pos, offset, cb);
} }
auto stream = conn_->streamManager->getStream(id); auto stream = conn_->streamManager->getStream(id);
auto minOffsetToDelivery = getStreamNextOffsetToDeliver(*stream); auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream);
if (offset < minOffsetToDelivery) { if (maxOffsetToDeliver.has_value() && (offset < *maxOffsetToDeliver)) {
// This offset is already delivered // This offset is already delivered
runOnEvbAsync([id, cb, offset](auto selfObj) { runOnEvbAsync([id, cb, offset](auto selfObj) {
if (selfObj->closeState_ != CloseState::OPEN) { if (selfObj->closeState_ != CloseState::OPEN) {

View File

@@ -1296,6 +1296,7 @@ TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackLowerThanExpected) {
transport->registerDeliveryCallback(stream, 20, &dcb2); transport->registerDeliveryCallback(stream, 20, &dcb2);
auto streamState = transport->transportConn->streamManager->getStream(stream); auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7; streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
EXPECT_CALL(dcb3, onDeliveryAck(_, _, _)) EXPECT_CALL(dcb3, onDeliveryAck(_, _, _))
.WillOnce(Invoke( .WillOnce(Invoke(

View File

@@ -1802,9 +1802,11 @@ TEST_F(QuicTransportTest, DeliveryCallbackClosesTransportOnDelivered) {
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
auto streamState = conn.streamManager->getStream(stream1);
conn.streamManager->addDeliverable(stream1); conn.streamManager->addDeliverable(stream1);
folly::SocketAddress addr; folly::SocketAddress addr;
NetworkData emptyData; NetworkData emptyData;
streamState->ackedIntervals.insert(0, 19);
// This will invoke the DeliveryClalback::onDelivered // This will invoke the DeliveryClalback::onDelivered
transport_->onNetworkData(addr, std::move(emptyData)); transport_->onNetworkData(addr, std::move(emptyData));
} }
@@ -1818,19 +1820,22 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksNothingDelivered) {
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false, false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState();
auto streamState = conn.streamManager->getStream(stream);
folly::SocketAddress addr; folly::SocketAddress addr;
NetworkData emptyData; NetworkData emptyData;
transport_->onNetworkData(addr, std::move(emptyData)); transport_->onNetworkData(addr, std::move(emptyData));
streamState->ackedIntervals.insert(0, 19);
// Clear out the other delivery callbacks before tear down transport. // Clear out the other delivery callbacks before tear down transport.
// Otherwise, transport will be holding on to delivery callback pointers // Otherwise, transport will be holding on to delivery callback pointers
// that are already dead: // that are already dead:
auto buf2 = buildRandomInputData(100); auto buf2 = buildRandomInputData(100);
transport_->writeChain(stream, buf2->clone(), true, false); transport_->writeChain(stream, buf2->clone(), true, false);
streamState->ackedIntervals.insert(20, 99);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState();
auto streamState = conn.streamManager->getStream(stream);
streamState->retransmissionBuffer.clear(); streamState->retransmissionBuffer.clear();
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
conn.streamManager->addDeliverable(stream); conn.streamManager->addDeliverable(stream);
@@ -1855,6 +1860,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksAllDelivered) {
conn.lossState.srtt = 100us; conn.lossState.srtt = 100us;
auto streamState = conn.streamManager->getStream(stream); auto streamState = conn.streamManager->getStream(stream);
streamState->retransmissionBuffer.clear(); streamState->retransmissionBuffer.clear();
streamState->ackedIntervals.insert(0, 1);
folly::SocketAddress addr; folly::SocketAddress addr;
NetworkData emptyData; NetworkData emptyData;
@@ -1882,6 +1888,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksPartialDelivered) {
folly::SocketAddress addr; folly::SocketAddress addr;
NetworkData emptyData; NetworkData emptyData;
streamState->ackedIntervals.insert(0, 99);
EXPECT_CALL(mockedDeliveryCallback1, onDeliveryAck(stream, 50, 100us)) EXPECT_CALL(mockedDeliveryCallback1, onDeliveryAck(stream, 50, 100us))
.Times(1); .Times(1);
transport_->onNetworkData(addr, std::move(emptyData)); transport_->onNetworkData(addr, std::move(emptyData));
@@ -1896,6 +1903,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksPartialDelivered) {
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
conn.streamManager->addDeliverable(stream); conn.streamManager->addDeliverable(stream);
NetworkData emptyData2; NetworkData emptyData2;
streamState->ackedIntervals.insert(100, 199);
EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 150, 100us)) EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 150, 100us))
.Times(1); .Times(1);
transport_->onNetworkData(addr, std::move(emptyData2)); transport_->onNetworkData(addr, std::move(emptyData2));
@@ -1926,6 +1934,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) {
folly::SocketAddress addr; folly::SocketAddress addr;
NetworkData emptyData; NetworkData emptyData;
streamState->ackedIntervals.insert(0, 49);
EXPECT_CALL(mockedDeliveryCallback1, onDeliveryAck(stream, 50, 100us)) EXPECT_CALL(mockedDeliveryCallback1, onDeliveryAck(stream, 50, 100us))
.Times(1); .Times(1);
transport_->onNetworkData(addr, std::move(emptyData)); transport_->onNetworkData(addr, std::move(emptyData));
@@ -1940,6 +1949,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) {
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
conn.streamManager->addDeliverable(stream); conn.streamManager->addDeliverable(stream);
NetworkData emptyData2; NetworkData emptyData2;
streamState->ackedIntervals.insert(50, 199);
EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 150, 100us)) EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 150, 100us))
.Times(1); .Times(1);
transport_->onNetworkData(addr, std::move(emptyData2)); transport_->onNetworkData(addr, std::move(emptyData2));
@@ -1971,6 +1981,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) {
folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false))); folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false)));
streamState->lossBuffer.emplace_back( streamState->lossBuffer.emplace_back(
folly::IOBuf::copyBuffer("And I'm lost"), 31, false); folly::IOBuf::copyBuffer("And I'm lost"), 31, false);
streamState->ackedIntervals.insert(0, 30);
folly::SocketAddress addr; folly::SocketAddress addr;
NetworkData emptyData; NetworkData emptyData;
@@ -1988,6 +1999,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) {
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
conn.streamManager->addDeliverable(stream); conn.streamManager->addDeliverable(stream);
NetworkData emptyData2; NetworkData emptyData2;
streamState->ackedIntervals.insert(31, 199);
EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 50, 100us)) EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 50, 100us))
.Times(1); .Times(1);
EXPECT_CALL(mockedDeliveryCallback3, onDeliveryAck(stream, 150, 100us)) EXPECT_CALL(mockedDeliveryCallback3, onDeliveryAck(stream, 150, 100us))

View File

@@ -381,21 +381,16 @@ uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) {
stream.currentWriteOffset + stream.writeBuffer.chainLength()); stream.currentWriteOffset + stream.writeBuffer.chainLength());
} }
uint64_t getStreamNextOffsetToDeliver(const QuicStreamState& stream) { folly::Optional<uint64_t> getLargestDeliverableOffset(
auto minOffsetToDeliver = stream.currentWriteOffset; const QuicStreamState& stream) {
// If the acked intervals is not empty, then the furthest acked interval // If the acked intervals is not empty, then the furthest acked interval
// starting at zero is the next offset. If there is no interval starting at // starting at zero is the next offset. If there is no interval starting at
// zero then we cannot deliver any offsets. // zero then we cannot deliver any offsets.
minOffsetToDeliver = std::min( if (stream.ackedIntervals.empty() ||
minOffsetToDeliver, stream.ackedIntervals.front().start != 0) {
stream.ackedIntervals.empty() || stream.ackedIntervals.front().start != 0 return folly::none;
? minOffsetToDeliver }
: stream.ackedIntervals.front().end); return stream.ackedIntervals.front().end;
minOffsetToDeliver = std::min(
minOffsetToDeliver,
stream.lossBuffer.empty() ? minOffsetToDeliver
: stream.lossBuffer[0].offset);
return minOffsetToDeliver;
} }
// TODO reap // TODO reap

View File

@@ -93,9 +93,11 @@ void appendPendingStreamReset(
uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream); uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream);
/** /**
* Get the the minimal write offset that's yet to deliver to peer * Get the the highest acked offset (if any) that we can execute delivery
* callbacks on.
*/ */
uint64_t getStreamNextOffsetToDeliver(const QuicStreamState& stream); folly::Optional<uint64_t> getLargestDeliverableOffset(
const QuicStreamState& stream);
/** /**
* Common functions for merging data into the read buffer for a Quic stream like * Common functions for merging data into the read buffer for a Quic stream like

View File

@@ -1880,28 +1880,17 @@ TEST_F(QuicStreamFunctionsTest, LargestWriteOffsetSeenNoFIN) {
EXPECT_EQ(120, getLargestWriteOffsetSeen(stream)); EXPECT_EQ(120, getLargestWriteOffsetSeen(stream));
} }
TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliver) { TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverNothingAcked) {
QuicStreamState stream(3, conn); QuicStreamState stream(3, conn);
stream.currentWriteOffset = 100; stream.currentWriteOffset = 100;
EXPECT_EQ(100, getStreamNextOffsetToDeliver(stream)); EXPECT_EQ(folly::none, getLargestDeliverableOffset(stream));
} }
TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxBuffer) { TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverAllAcked) {
QuicStreamState stream(3, conn); QuicStreamState stream(3, conn);
stream.currentWriteOffset = 100; stream.currentWriteOffset = 100;
stream.retransmissionBuffer.emplace( stream.ackedIntervals.insert(0, 99);
50, std::make_unique<StreamBuffer>(buildRandomInputData(10), 50)); EXPECT_EQ(99, getLargestDeliverableOffset(stream).value());
stream.ackedIntervals.insert(0, 49);
EXPECT_EQ(49, getStreamNextOffsetToDeliver(stream));
}
TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxAndLossBuffer) {
QuicStreamState stream(3, conn);
stream.currentWriteOffset = 100;
stream.lossBuffer.emplace_back(buildRandomInputData(10), 30);
stream.retransmissionBuffer.emplace(
50, std::make_unique<StreamBuffer>(buildRandomInputData(10), 50));
EXPECT_EQ(30, getStreamNextOffsetToDeliver(stream));
} }
TEST_F(QuicStreamFunctionsTest, LossBufferEmpty) { TEST_F(QuicStreamFunctionsTest, LossBufferEmpty) {