diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 70f5f2428..3e8ef7aab 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1474,16 +1474,16 @@ void QuicTransportBase::processCallbacksAfterNetworkData() { while (deliverableStreamId.has_value()) { auto streamId = *deliverableStreamId; auto stream = conn_->streamManager->getStream(streamId); - auto minOffsetToDeliver = getStreamNextOffsetToDeliver(*stream); + auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream); - while (true) { + while (maxOffsetToDeliver.has_value()) { auto deliveryCallbacksForAckedStream = deliveryCallbacks_.find(streamId); if (deliveryCallbacksForAckedStream == deliveryCallbacks_.end() || deliveryCallbacksForAckedStream->second.empty()) { break; } if (deliveryCallbacksForAckedStream->second.front().first > - minOffsetToDeliver) { + *maxOffsetToDeliver) { break; } auto deliveryCallbackAndOffset = @@ -1936,8 +1936,8 @@ QuicTransportBase::registerDeliveryCallback( deliveryCallbackIt->second.emplace(pos, offset, cb); } auto stream = conn_->streamManager->getStream(id); - auto minOffsetToDelivery = getStreamNextOffsetToDeliver(*stream); - if (offset < minOffsetToDelivery) { + auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream); + if (maxOffsetToDeliver.has_value() && (offset < *maxOffsetToDeliver)) { // This offset is already delivered runOnEvbAsync([id, cb, offset](auto selfObj) { if (selfObj->closeState_ != CloseState::OPEN) { diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 164097d6a..8976044fe 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -1296,6 +1296,7 @@ TEST_F(QuicTransportImplTest, RegisterDeliveryCallbackLowerThanExpected) { transport->registerDeliveryCallback(stream, 20, &dcb2); auto streamState = transport->transportConn->streamManager->getStream(stream); streamState->currentWriteOffset = 7; + streamState->ackedIntervals.insert(0, 6); EXPECT_CALL(dcb3, onDeliveryAck(_, _, _)) .WillOnce(Invoke( diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 972bcc1c0..b0e7c7944 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -1802,9 +1802,11 @@ TEST_F(QuicTransportTest, DeliveryCallbackClosesTransportOnDelivered) { loopForWrites(); auto& conn = transport_->getConnectionState(); + auto streamState = conn.streamManager->getStream(stream1); conn.streamManager->addDeliverable(stream1); folly::SocketAddress addr; NetworkData emptyData; + streamState->ackedIntervals.insert(0, 19); // This will invoke the DeliveryClalback::onDelivered transport_->onNetworkData(addr, std::move(emptyData)); } @@ -1818,19 +1820,22 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksNothingDelivered) { transport_->writeChain(stream, buf->clone(), false, false); loopForWrites(); + auto& conn = transport_->getConnectionState(); + auto streamState = conn.streamManager->getStream(stream); + folly::SocketAddress addr; NetworkData emptyData; transport_->onNetworkData(addr, std::move(emptyData)); + streamState->ackedIntervals.insert(0, 19); // Clear out the other delivery callbacks before tear down transport. // Otherwise, transport will be holding on to delivery callback pointers // that are already dead: auto buf2 = buildRandomInputData(100); transport_->writeChain(stream, buf2->clone(), true, false); + streamState->ackedIntervals.insert(20, 99); loopForWrites(); - auto& conn = transport_->getConnectionState(); - auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); streamState->lossBuffer.clear(); conn.streamManager->addDeliverable(stream); @@ -1855,6 +1860,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksAllDelivered) { conn.lossState.srtt = 100us; auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); + streamState->ackedIntervals.insert(0, 1); folly::SocketAddress addr; NetworkData emptyData; @@ -1882,6 +1888,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksPartialDelivered) { folly::SocketAddress addr; NetworkData emptyData; + streamState->ackedIntervals.insert(0, 99); EXPECT_CALL(mockedDeliveryCallback1, onDeliveryAck(stream, 50, 100us)) .Times(1); transport_->onNetworkData(addr, std::move(emptyData)); @@ -1896,6 +1903,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksPartialDelivered) { streamState->lossBuffer.clear(); conn.streamManager->addDeliverable(stream); NetworkData emptyData2; + streamState->ackedIntervals.insert(100, 199); EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 150, 100us)) .Times(1); transport_->onNetworkData(addr, std::move(emptyData2)); @@ -1926,6 +1934,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) { folly::SocketAddress addr; NetworkData emptyData; + streamState->ackedIntervals.insert(0, 49); EXPECT_CALL(mockedDeliveryCallback1, onDeliveryAck(stream, 50, 100us)) .Times(1); transport_->onNetworkData(addr, std::move(emptyData)); @@ -1940,6 +1949,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) { streamState->lossBuffer.clear(); conn.streamManager->addDeliverable(stream); NetworkData emptyData2; + streamState->ackedIntervals.insert(50, 199); EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 150, 100us)) .Times(1); transport_->onNetworkData(addr, std::move(emptyData2)); @@ -1971,6 +1981,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) { folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false))); streamState->lossBuffer.emplace_back( folly::IOBuf::copyBuffer("And I'm lost"), 31, false); + streamState->ackedIntervals.insert(0, 30); folly::SocketAddress addr; NetworkData emptyData; @@ -1988,6 +1999,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) { streamState->lossBuffer.clear(); conn.streamManager->addDeliverable(stream); NetworkData emptyData2; + streamState->ackedIntervals.insert(31, 199); EXPECT_CALL(mockedDeliveryCallback2, onDeliveryAck(stream, 50, 100us)) .Times(1); EXPECT_CALL(mockedDeliveryCallback3, onDeliveryAck(stream, 150, 100us)) diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index 3b7d73294..dcd9def5b 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -381,21 +381,16 @@ uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) { stream.currentWriteOffset + stream.writeBuffer.chainLength()); } -uint64_t getStreamNextOffsetToDeliver(const QuicStreamState& stream) { - auto minOffsetToDeliver = stream.currentWriteOffset; +folly::Optional getLargestDeliverableOffset( + const QuicStreamState& stream) { // If the acked intervals is not empty, then the furthest acked interval // starting at zero is the next offset. If there is no interval starting at // zero then we cannot deliver any offsets. - minOffsetToDeliver = std::min( - minOffsetToDeliver, - stream.ackedIntervals.empty() || stream.ackedIntervals.front().start != 0 - ? minOffsetToDeliver - : stream.ackedIntervals.front().end); - minOffsetToDeliver = std::min( - minOffsetToDeliver, - stream.lossBuffer.empty() ? minOffsetToDeliver - : stream.lossBuffer[0].offset); - return minOffsetToDeliver; + if (stream.ackedIntervals.empty() || + stream.ackedIntervals.front().start != 0) { + return folly::none; + } + return stream.ackedIntervals.front().end; } // TODO reap diff --git a/quic/state/QuicStreamFunctions.h b/quic/state/QuicStreamFunctions.h index d264e3b91..bd7e59930 100644 --- a/quic/state/QuicStreamFunctions.h +++ b/quic/state/QuicStreamFunctions.h @@ -93,9 +93,11 @@ void appendPendingStreamReset( uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream); /** - * Get the the minimal write offset that's yet to deliver to peer + * Get the the highest acked offset (if any) that we can execute delivery + * callbacks on. */ -uint64_t getStreamNextOffsetToDeliver(const QuicStreamState& stream); +folly::Optional getLargestDeliverableOffset( + const QuicStreamState& stream); /** * Common functions for merging data into the read buffer for a Quic stream like diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index 89acd13fd..e1cf41249 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -1880,28 +1880,17 @@ TEST_F(QuicStreamFunctionsTest, LargestWriteOffsetSeenNoFIN) { EXPECT_EQ(120, getLargestWriteOffsetSeen(stream)); } -TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliver) { +TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverNothingAcked) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; - EXPECT_EQ(100, getStreamNextOffsetToDeliver(stream)); + EXPECT_EQ(folly::none, getLargestDeliverableOffset(stream)); } -TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxBuffer) { +TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverAllAcked) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; - stream.retransmissionBuffer.emplace( - 50, std::make_unique(buildRandomInputData(10), 50)); - stream.ackedIntervals.insert(0, 49); - EXPECT_EQ(49, getStreamNextOffsetToDeliver(stream)); -} - -TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxAndLossBuffer) { - QuicStreamState stream(3, conn); - stream.currentWriteOffset = 100; - stream.lossBuffer.emplace_back(buildRandomInputData(10), 30); - stream.retransmissionBuffer.emplace( - 50, std::make_unique(buildRandomInputData(10), 50)); - EXPECT_EQ(30, getStreamNextOffsetToDeliver(stream)); + stream.ackedIntervals.insert(0, 99); + EXPECT_EQ(99, getLargestDeliverableOffset(stream).value()); } TEST_F(QuicStreamFunctionsTest, LossBufferEmpty) {