From b6e134fdee698e8fe2e8e4ae81e2b869132a2ee7 Mon Sep 17 00:00:00 2001 From: Matt Joras Date: Tue, 26 Nov 2019 09:54:44 -0800 Subject: [PATCH] Use F14FastMap as the retranmission buffer. Summary: The retransmission buffer tracks stream frame data we have sent that is currently unacked. We keep this as a sorted `deque`. This isn't so bad for performance, but we can do better if we break ourselves of the requirement that it be sorted (removing a binary search on ACK). To do this we make the buffer a map of offset -> `StreamBuffer`. There were two places that were dependent on the sorted nature of the list. 1. For partial reliablity we call `shrinkBuffers` to remove all unacked buffers less than an offset. For this we now have to do it with a full traversal of the retransmission buffer instead of only having to do an O(offset) search. In the future we could make this better by only lazily deleting from the retransmission buffer on ACK or packet loss. 2. We used the start of the retransmission buffer to determine if a delivery callback could be fired for a given offset. We need some new state to track this. Instead of tracking unacked buffers, we now track acked ranges using the existing `IntervalSet`. This set should be small for the typical case, as we think most ACKs will come in order and just cause existing ranges to merge. Reviewed By: yangchi Differential Revision: D18609467 fbshipit-source-id: 13cd2164352f1183362be9f675c1bdc686426698 --- quic/api/QuicTransportFunctions.cpp | 30 ++++---- quic/api/test/QuicTransportFunctionsTest.cpp | 21 +++--- quic/api/test/QuicTransportTest.cpp | 47 +++++++----- quic/codec/QuicPacketRebuilder.cpp | 28 +++----- quic/codec/test/QuicPacketRebuilderTest.cpp | 23 +++--- quic/loss/QuicLossFunctions.cpp | 29 +++----- quic/loss/test/QuicLossFunctionsTest.cpp | 3 - quic/server/test/QuicServerTransportTest.cpp | 49 +++++++------ quic/state/QPRFunctions.cpp | 23 ++++++ quic/state/QuicStreamFunctions.cpp | 19 +++-- quic/state/StreamData.h | 16 ++++- quic/state/stream/StreamSendHandlers.cpp | 27 ++++--- .../stream/test/StreamStateFunctionsTest.cpp | 4 +- .../stream/test/StreamStateMachineTest.cpp | 71 +++++++++++++++++-- quic/state/test/QPRFunctionsTest.cpp | 10 +-- quic/state/test/QuicStreamFunctionsTest.cpp | 19 ++--- 16 files changed, 253 insertions(+), 166 deletions(-) diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index f8a20830d..04bf01194 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -98,11 +98,13 @@ void handleNewStreamDataWritten( stream.currentWriteOffset += frameLen; auto bufWritten = stream.writeBuffer.split(folly::to(frameLen)); stream.currentWriteOffset += frameFin ? 1 : 0; - CHECK( - stream.retransmissionBuffer.empty() || - stream.retransmissionBuffer.back().offset < originalOffset); - stream.retransmissionBuffer.emplace_back( - std::move(bufWritten), originalOffset, frameFin); + CHECK(stream.retransmissionBuffer + .emplace( + std::piecewise_construct, + std::forward_as_tuple(originalOffset), + std::forward_as_tuple( + std::move(bufWritten), originalOffset, frameFin)) + .second); } void handleRetransmissionWritten( @@ -126,17 +128,13 @@ void handleRetransmissionWritten( lossBufferIter->offset += frameLen; bufWritten = lossBufferIter->data.split(frameLen); } - stream.retransmissionBuffer.emplace( - std::upper_bound( - stream.retransmissionBuffer.begin(), - stream.retransmissionBuffer.end(), - frameOffset, - [](const auto& offsetIn, const auto& buffer) { - return offsetIn < buffer.offset; - }), - std::move(bufWritten), - frameOffset, - frameFin); + CHECK(stream.retransmissionBuffer + .emplace( + std::piecewise_construct, + std::forward_as_tuple(frameOffset), + std::forward_as_tuple( + std::move(bufWritten), frameOffset, frameFin)) + .second); } /** diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index ec9c84a10..bcf054655 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -224,7 +224,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_TRUE(conn->outstandingPackets.back().isAppLimited); EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); - auto& rt1 = stream1->retransmissionBuffer.front(); + auto& rt1 = stream1->retransmissionBuffer.at(0); EXPECT_EQ(stream1->currentWriteOffset, 5); EXPECT_EQ(stream2->currentWriteOffset, 13); @@ -234,7 +234,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt1.data.front())); EXPECT_EQ(stream2->retransmissionBuffer.size(), 1); - auto& rt2 = stream2->retransmissionBuffer.front(); + auto& rt2 = stream2->retransmissionBuffer.at(0); EXPECT_EQ(rt2.offset, 0); EXPECT_TRUE(eq(*buf, *rt2.data.front())); @@ -244,9 +244,9 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { // Testing retransmission stream1->lossBuffer.push_back(std::move(rt1)); - stream1->retransmissionBuffer.pop_front(); + stream1->retransmissionBuffer.clear(); stream2->lossBuffer.push_back(std::move(rt2)); - stream2->retransmissionBuffer.pop_front(); + stream2->retransmissionBuffer.clear(); conn->streamManager->addLoss(stream1->id); conn->streamManager->addLoss(stream2->id); @@ -285,17 +285,17 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_EQ(stream1->lossBuffer.size(), 0); EXPECT_EQ(stream1->retransmissionBuffer.size(), 2); - auto& rt3 = stream1->retransmissionBuffer.back(); + auto& rt3 = stream1->retransmissionBuffer.at(5); EXPECT_TRUE(eq(IOBuf::copyBuffer("hats up"), rt3.data.move())); - auto& rt4 = stream1->retransmissionBuffer.front(); + auto& rt4 = stream1->retransmissionBuffer.at(0); EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt4.data.front())); // loss buffer should be split into 2. Part in retransmission buffer and // part remains in loss buffer. EXPECT_EQ(stream2->lossBuffer.size(), 1); EXPECT_EQ(stream2->retransmissionBuffer.size(), 1); - auto& rt5 = stream2->retransmissionBuffer.front(); + auto& rt5 = stream2->retransmissionBuffer.at(0); EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey wh"), *rt5.data.front())); EXPECT_EQ(rt5.offset, 0); EXPECT_EQ(rt5.eof, 0); @@ -434,7 +434,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionFinOnly) { EXPECT_TRUE(frame->fin); EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); - auto& rt1 = stream1->retransmissionBuffer.front(); + auto& rt1 = stream1->retransmissionBuffer.at(0); EXPECT_EQ(stream1->currentWriteOffset, 1); EXPECT_EQ(rt1.offset, 0); @@ -481,7 +481,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionAllBytesExceptFin) { EXPECT_EQ(stream1->currentWriteOffset, buf->computeChainDataLength()); EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); - auto& rt1 = stream1->retransmissionBuffer.front(); + auto& rt1 = stream1->retransmissionBuffer.at(0); EXPECT_EQ(rt1.offset, 0); EXPECT_EQ( rt1.data.front()->computeChainDataLength(), @@ -1217,9 +1217,6 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketRetxBufferSorted) { getVersion(*conn), conn->transportSettings.writeConnectionDataPacketsLimit); EXPECT_EQ(2, stream->retransmissionBuffer.size()); - EXPECT_GT( - stream->retransmissionBuffer.back().offset, - stream->retransmissionBuffer.front().offset); } TEST_F(QuicTransportFunctionsTest, NothingWritten) { diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 286b0a5f3..d2b0e9934 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -242,22 +242,17 @@ void dropPackets(QuicServerConnectionState& conn) { } auto stream = conn.streamManager->findStream(streamFrame->streamId); ASSERT_TRUE(stream); - auto itr = std::find_if( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - [&streamFrame](const auto& buffer) { - return streamFrame->offset == buffer.offset; - }); + auto itr = stream->retransmissionBuffer.find(streamFrame->offset); ASSERT_TRUE(itr != stream->retransmissionBuffer.end()); stream->lossBuffer.insert( std::upper_bound( stream->lossBuffer.begin(), stream->lossBuffer.end(), - itr->offset, + itr->second.offset, [](const auto& offset, const auto& buffer) { return offset < buffer.offset; }), - std::move(*itr)); + std::move(itr->second)); stream->retransmissionBuffer.erase(itr); if (std::find( conn.streamManager->lossStreams().begin(), @@ -313,15 +308,24 @@ void verifyCorrectness( // Verify retransmissionBuffer: EXPECT_FALSE(stream->retransmissionBuffer.empty()); IOBufQueue retxBufCombined; - for (auto& retxBuf : stream->retransmissionBuffer) { - retxBufCombined.append(retxBuf.data.front()->clone()); + std::vector rtxCopy; + for (auto& itr : stream->retransmissionBuffer) { + rtxCopy.push_back(StreamBuffer( + itr.second.data.front()->clone(), itr.second.offset, itr.second.eof)); + } + std::sort(rtxCopy.begin(), rtxCopy.end(), [](auto& s1, auto& s2) { + return s1.offset < s2.offset; + }); + for (auto& s : rtxCopy) { + retxBufCombined.append(s.data.move()); } EXPECT_TRUE(IOBufEqualTo()(expected, *retxBufCombined.move())); - EXPECT_EQ(finExpected, stream->retransmissionBuffer.back().eof); + EXPECT_EQ(finExpected, stream->retransmissionBuffer.at(offsets.back()).eof); std::vector retxBufOffsets; for (const auto& b : stream->retransmissionBuffer) { - retxBufOffsets.push_back(b.offset); + retxBufOffsets.push_back(b.second.offset); } + std::sort(retxBufOffsets.begin(), retxBufOffsets.end()); EXPECT_EQ(offsets, retxBufOffsets); } @@ -1196,9 +1200,8 @@ TEST_F(QuicTransportTest, CloneAfterRecvReset) { EXPECT_EQ(1, conn.outstandingPackets.size()); auto stream = conn.streamManager->getStream(streamId); EXPECT_EQ(1, stream->retransmissionBuffer.size()); - EXPECT_EQ(0, stream->retransmissionBuffer.back().offset); - EXPECT_EQ(0, stream->retransmissionBuffer.back().data.chainLength()); - EXPECT_TRUE(stream->retransmissionBuffer.back().eof); + EXPECT_EQ(0, stream->retransmissionBuffer.at(0).data.chainLength()); + EXPECT_TRUE(stream->retransmissionBuffer.at(0).eof); EXPECT_TRUE(stream->lossBuffer.empty()); EXPECT_EQ(0, stream->writeBuffer.chainLength()); EXPECT_EQ(1, stream->currentWriteOffset); @@ -1927,8 +1930,11 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) { conn.lossState.srtt = 100us; auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); - streamState->retransmissionBuffer.emplace_back( - folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false); + streamState->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(51), + std::forward_as_tuple( + folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false)); folly::SocketAddress addr; NetworkData emptyData; @@ -1970,8 +1976,11 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) { auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); streamState->lossBuffer.clear(); - streamState->retransmissionBuffer.emplace_back( - folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false); + streamState->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(51), + std::forward_as_tuple( + folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false)); streamState->lossBuffer.emplace_back( folly::IOBuf::copyBuffer("And I'm lost"), 31, false); diff --git a/quic/codec/QuicPacketRebuilder.cpp b/quic/codec/QuicPacketRebuilder.cpp index 64f09978f..c2c99f0be 100644 --- a/quic/codec/QuicPacketRebuilder.cpp +++ b/quic/codec/QuicPacketRebuilder.cpp @@ -200,23 +200,17 @@ Buf PacketRebuilder::cloneCryptoRetransmissionBuffer( * lost packet. */ DCHECK(frame.len) << "WriteCryptoFrame cloning: frame is empty. " << conn_; - auto iter = std::lower_bound( - stream.retransmissionBuffer.begin(), - stream.retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& targetOffset) { - return buffer.offset < targetOffset; - }); + auto iter = stream.retransmissionBuffer.find(frame.offset); // If the crypto stream is canceled somehow, just skip cloning this frame if (iter == stream.retransmissionBuffer.end()) { return nullptr; } - DCHECK(iter->offset == frame.offset) + DCHECK(iter->second.offset == frame.offset) << "WriteCryptoFrame cloning: offset mismatch. " << conn_; - DCHECK(iter->data.chainLength() == frame.len) + DCHECK(iter->second.data.chainLength() == frame.len) << "WriteCryptoFrame cloning: Len mismatch. " << conn_; - return iter->data.front()->clone(); + return iter->second.data.front()->clone(); } Buf PacketRebuilder::cloneRetransmissionBuffer( @@ -236,19 +230,13 @@ Buf PacketRebuilder::cloneRetransmissionBuffer( */ DCHECK(stream); DCHECK(retransmittable(*stream)); - auto iter = std::lower_bound( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& targetOffset) { - return buffer.offset < targetOffset; - }); + auto iter = stream->retransmissionBuffer.find(frame.offset); if (iter != stream->retransmissionBuffer.end()) { - if (streamFrameMatchesRetransmitBuffer(*stream, frame, *iter)) { - DCHECK(!frame.len || !iter->data.empty()) + if (streamFrameMatchesRetransmitBuffer(*stream, frame, iter->second)) { + DCHECK(!frame.len || !iter->second.data.empty()) << "WriteStreamFrame cloning: frame is not empty but StreamBuffer has" << " empty data. " << conn_; - return (frame.len ? iter->data.front()->clone() : nullptr); + return (frame.len ? iter->second.data.front()->clone() : nullptr); } } return nullptr; diff --git a/quic/codec/test/QuicPacketRebuilderTest.cpp b/quic/codec/test/QuicPacketRebuilderTest.cpp index 1673d7e01..93c45053f 100644 --- a/quic/codec/test/QuicPacketRebuilderTest.cpp +++ b/quic/codec/test/QuicPacketRebuilderTest.cpp @@ -99,12 +99,13 @@ TEST_F(QuicPacketRebuilderTest, RebuildPacket) { auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(8, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), buf->clone(), 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(buf->clone(), 0, true)); conn.cryptoState->oneRttStream.retransmissionBuffer.emplace( - conn.cryptoState->oneRttStream.retransmissionBuffer.begin(), - cryptoBuf->clone(), - 0, - true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(cryptoBuf->clone(), 0, true)); // rebuild a packet from the built out packet ShortHeader shortHeader2( @@ -240,7 +241,9 @@ TEST_F(QuicPacketRebuilderTest, FinOnlyStreamRebuild) { writeStreamFrameHeader(regularBuilder1, streamId, 0, 0, 0, true); auto packet1 = std::move(regularBuilder1).buildPacket(); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), nullptr, 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(nullptr, 0, true)); // rebuild a packet from the built out packet ShortHeader shortHeader2( @@ -294,7 +297,9 @@ TEST_F(QuicPacketRebuilderTest, RebuildDataStreamAndEmptyCryptoStream) { auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(2, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), buf->clone(), 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(buf->clone(), 0, true)); // Do not add the buf to crypto stream's retransmission buffer, // imagine it was cleared @@ -390,7 +395,9 @@ TEST_F(QuicPacketRebuilderTest, CannotRebuild) { auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(5, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), buf->clone(), 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(buf->clone(), 0, true)); // new builder has a much smaller writable bytes limit ShortHeader shortHeader2( diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index 665a29c4a..8d57e810d 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -96,13 +96,7 @@ void markPacketLoss( if (!stream) { break; } - auto bufferItr = std::lower_bound( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); + auto bufferItr = stream->retransmissionBuffer.find(frame.offset); if (bufferItr == stream->retransmissionBuffer.end()) { // It's possible that the stream was reset or data on the stream was // skipped while we discovered that its packet was lost so we might @@ -111,18 +105,19 @@ void markPacketLoss( } // The original rxmt offset might have been bumped up after it was // shrunk due to egress partially reliable skip. - if (!streamFrameMatchesRetransmitBuffer(*stream, frame, *bufferItr)) { + if (!streamFrameMatchesRetransmitBuffer( + *stream, frame, bufferItr->second)) { break; } stream->lossBuffer.insert( std::upper_bound( stream->lossBuffer.begin(), stream->lossBuffer.end(), - bufferItr->offset, + bufferItr->second.offset, [](const auto& offset, const auto& buffer) { return offset < buffer.offset; }), - std::move(*bufferItr)); + std::move(bufferItr->second)); stream->retransmissionBuffer.erase(bufferItr); conn.streamManager->updateLossStreams(*stream); break; @@ -136,28 +131,22 @@ void markPacketLoss( auto encryptionLevel = protectionTypeToEncryptionLevel(protectionType); auto cryptoStream = getCryptoStream(*conn.cryptoState, encryptionLevel); - auto bufferItr = std::lower_bound( - cryptoStream->retransmissionBuffer.begin(), - cryptoStream->retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); + auto bufferItr = cryptoStream->retransmissionBuffer.find(frame.offset); if (bufferItr == cryptoStream->retransmissionBuffer.end()) { // It's possible that the stream was reset while we discovered that // it's packet was lost so we might not have the offset. break; } - DCHECK_EQ(bufferItr->offset, frame.offset); + DCHECK_EQ(bufferItr->second.offset, frame.offset); cryptoStream->lossBuffer.insert( std::upper_bound( cryptoStream->lossBuffer.begin(), cryptoStream->lossBuffer.end(), - bufferItr->offset, + bufferItr->second.offset, [](const auto& offset, const auto& buffer) { return offset < buffer.offset; }), - std::move(*bufferItr)); + std::move(bufferItr->second)); cryptoStream->retransmissionBuffer.erase(bufferItr); break; } diff --git a/quic/loss/test/QuicLossFunctionsTest.cpp b/quic/loss/test/QuicLossFunctionsTest.cpp index 5c68369a6..693f6b6b2 100644 --- a/quic/loss/test/QuicLossFunctionsTest.cpp +++ b/quic/loss/test/QuicLossFunctionsTest.cpp @@ -426,9 +426,6 @@ TEST_F(QuicLossFunctionsTest, RetxBufferSortedAfterLoss) { markPacketLoss( *conn, packet.packet, false, packet.packet.header.getPacketSequenceNum()); EXPECT_EQ(2, stream->retransmissionBuffer.size()); - EXPECT_GT( - stream->retransmissionBuffer.back().offset, - stream->retransmissionBuffer.front().offset); } TEST_F(QuicLossFunctionsTest, TestMarkCryptoLostAfterCancelRetransmission) { diff --git a/quic/server/test/QuicServerTransportTest.cpp b/quic/server/test/QuicServerTransportTest.cpp index fbdd21154..5d2873069 100644 --- a/quic/server/test/QuicServerTransportTest.cpp +++ b/quic/server/test/QuicServerTransportTest.cpp @@ -1179,12 +1179,7 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) { if (!frame) { continue; } - auto it = std::find_if( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - [&](auto& buffer) { - return buffer.offset == frame->offset && buffer.eof == frame->fin; - }); + auto it = stream->retransmissionBuffer.find(frame->offset); ASSERT_TRUE(it != stream->retransmissionBuffer.end()); if (currentPacket == packetNum1 && frame->streamId == streamId) { buffersInPacket1++; @@ -1358,8 +1353,10 @@ TEST_F(QuicServerTransportTest, RecvRstStreamFrame) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); writeDataToQuicStream(*stream, IOBuf::copyBuffer(words.at(3)), false); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1413,8 +1410,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrame) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1455,8 +1454,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterCloseStream) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1496,8 +1497,10 @@ TEST_F(QuicServerTransportTest, RecvInvalidMaxStreamData) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1534,8 +1537,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterHalfCloseRemote) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1606,8 +1611,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterReset) { stream1->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream1->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream1->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream1->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream1->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream1->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream1->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1615,8 +1622,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterReset) { stream2->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream2->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream2->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream2->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream2->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream2->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream2->currentReadOffset = words.at(0).length() + words.at(1).length(); diff --git a/quic/state/QPRFunctions.cpp b/quic/state/QPRFunctions.cpp index 706dfd162..a295bbd4f 100644 --- a/quic/state/QPRFunctions.cpp +++ b/quic/state/QPRFunctions.cpp @@ -34,6 +34,29 @@ void shrinkBuffers(std::deque& buffers, uint64_t offset) { } } +void shrinkBuffers( + folly::F14FastMap& buffers, + uint64_t offset) { + // Do a linear search of the entire buffer, there can be exactly one trimmed + // buffer, since we are changing the offset for that single buffer we need to + // change the offset in the StreamBuffer, but keep it keyed on the same + // offset as before so we still remove it on ack. + for (auto itr = buffers.begin(); itr != buffers.end();) { + if (itr->second.offset >= offset) { + itr++; + continue; + } + if (itr->second.offset + itr->second.data.chainLength() <= offset) { + itr = buffers.erase(itr); + } else { + uint64_t amount = offset - itr->second.offset; + itr->second.data.trimStartAtMost(amount); + itr->second.offset += amount; + itr++; + } + } +} + void shrinkRetransmittableBuffers( QuicStreamState* stream, uint64_t minimumRetransmittableOffset) { diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index 4664ec038..6d33adc9d 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -382,11 +382,14 @@ uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) { uint64_t getStreamNextOffsetToDeliver(const QuicStreamState& stream) { auto minOffsetToDeliver = stream.currentWriteOffset; + // If the acked intervals is not empty, then the furthest acked interval + // starting at zero is the next offset. If there is no interval starting at + // zero then we cannot deliver any offsets. minOffsetToDeliver = std::min( minOffsetToDeliver, - stream.retransmissionBuffer.empty() + stream.ackedIntervals.empty() || stream.ackedIntervals.front().start != 0 ? minOffsetToDeliver - : stream.retransmissionBuffer[0].offset); + : stream.ackedIntervals.front().end); minOffsetToDeliver = std::min( minOffsetToDeliver, stream.lossBuffer.empty() ? minOffsetToDeliver @@ -426,16 +429,10 @@ void processCryptoStreamAck( QuicCryptoStream& cryptoStream, uint64_t offset, uint64_t len) { - auto ackedBuffer = std::lower_bound( - cryptoStream.retransmissionBuffer.begin(), - cryptoStream.retransmissionBuffer.end(), - offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); - + auto ackedBuffer = cryptoStream.retransmissionBuffer.find(offset); if (ackedBuffer == cryptoStream.retransmissionBuffer.end() || - ackedBuffer->offset != offset || ackedBuffer->data.chainLength() != len) { + ackedBuffer->second.offset != offset || + ackedBuffer->second.data.chainLength() != len) { // It's possible retransmissions of crypto data were canceled. return; } diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 8d63ae4f2..820fe27c5 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -36,12 +36,22 @@ struct QuicStreamLike { // TODO replace with BufQueue folly::IOBufQueue writeBuffer{folly::IOBufQueue::cacheChainLength()}; - // Stores a list of buffers which have been written to the socket and are + // Stores a map of buffers which have been written to the socket and are // currently un-acked. Each one represents one StreamFrame that was written. // We need to buffer these because these might be retransmitted // in the future. - // These are sorted in order of start offset. - std::deque retransmissionBuffer; + // These are associated with the starting offset of the buffer. + // Note: the offset in the StreamBuffer itself can be >= the offset on which + // it is keyed due to partial reliability - when data is skipped the offset + // in the StreamBuffer may be incremented, but the keyed offset must remain + // the same so it can be removed from the buffer on ACK. + folly::F14FastMap retransmissionBuffer; + + // Tracks intervals which we have received ACKs for. E.g. in the case of all + // data being acked this would contain one internval from 0 -> the largest + // offseet ACKed. This allows us to track which delivery callbacks can be + // called. + IntervalSet ackedIntervals; // Stores a list of buffers which have been marked as loss by loss detector. // Each one represents one StreamFrame that was written. diff --git a/quic/state/stream/StreamSendHandlers.cpp b/quic/state/stream/StreamSendHandlers.cpp index a004ffbb4..83e4ee6b4 100644 --- a/quic/state/stream/StreamSendHandlers.cpp +++ b/quic/state/stream/StreamSendHandlers.cpp @@ -91,28 +91,25 @@ void sendAckSMHandler( switch (stream.sendState) { case StreamSendState::Open_E: { // Clean up the acked buffers from the retransmissionBuffer. - auto ackedBuffer = std::lower_bound( - stream.retransmissionBuffer.begin(), - stream.retransmissionBuffer.end(), - ackedFrame.offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); - + auto ackedBuffer = stream.retransmissionBuffer.find(ackedFrame.offset); if (ackedBuffer != stream.retransmissionBuffer.end()) { if (streamFrameMatchesRetransmitBuffer( - stream, ackedFrame, *ackedBuffer)) { + stream, ackedFrame, ackedBuffer->second)) { VLOG(10) << "Open: acked stream data stream=" << stream.id - << " offset=" << ackedBuffer->offset - << " len=" << ackedBuffer->data.chainLength() - << " eof=" << ackedBuffer->eof << " " << stream.conn; + << " offset=" << ackedBuffer->second.offset + << " len=" << ackedBuffer->second.data.chainLength() + << " eof=" << ackedBuffer->second.eof << " " << stream.conn; + stream.ackedIntervals.insert( + ackedBuffer->second.offset, + ackedBuffer->second.offset + + ackedBuffer->second.data.chainLength()); stream.retransmissionBuffer.erase(ackedBuffer); } else { VLOG(10) << "Open: received an ack for already discarded buffer; stream=" - << stream.id << " offset=" << ackedBuffer->offset - << " len=" << ackedBuffer->data.chainLength() - << " eof=" << ackedBuffer->eof << " " << stream.conn; + << stream.id << " offset=" << ackedBuffer->second.offset + << " len=" << ackedBuffer->second.data.chainLength() + << " eof=" << ackedBuffer->second.eof << " " << stream.conn; } } diff --git a/quic/state/stream/test/StreamStateFunctionsTest.cpp b/quic/state/stream/test/StreamStateFunctionsTest.cpp index 74956b21f..b3e5ac906 100644 --- a/quic/state/stream/test/StreamStateFunctionsTest.cpp +++ b/quic/state/stream/test/StreamStateFunctionsTest.cpp @@ -32,8 +32,8 @@ TEST_F(StreamStateFunctionsTests, BasicResetTest) { StreamBuffer(folly::IOBuf::copyBuffer(" It is not a hotdog."), 15)); writeDataToQuicStream( stream, folly::IOBuf::copyBuffer("What is it then?"), false); - stream.retransmissionBuffer.emplace_back( - folly::IOBuf::copyBuffer("How would I know?"), 34); + stream.retransmissionBuffer.emplace( + 34, StreamBuffer(folly::IOBuf::copyBuffer("How would I know?"), 34)); auto currentWriteOffset = stream.currentWriteOffset; auto currentReadOffset = stream.currentReadOffset; EXPECT_TRUE(stream.writable()); diff --git a/quic/state/stream/test/StreamStateMachineTest.cpp b/quic/state/stream/test/StreamStateMachineTest.cpp index 4e136d0e3..6e5583ce2 100644 --- a/quic/state/stream/test/StreamStateMachineTest.cpp +++ b/quic/state/stream/test/StreamStateMachineTest.cpp @@ -175,6 +175,69 @@ TEST_F(QuicOpenStateTest, AckStream) { ASSERT_EQ(stream->sendState, StreamSendState::Closed_E); } +TEST_F(QuicOpenStateTest, AckStreamMulti) { + auto conn = createConn(); + + auto stream = conn->streamManager->createNextBidirectionalStream().value(); + folly::Optional serverChosenConnId = *conn->clientConnectionId; + serverChosenConnId.value().data()[0] ^= 0x01; + EventBase evb; + auto sock = std::make_unique(&evb); + + auto buf = IOBuf::copyBuffer("hello"); + writeQuicPacket( + *conn, + *conn->clientConnectionId, + *serverChosenConnId, + *sock, + *stream, + *buf, + false); + writeQuicPacket( + *conn, + *conn->clientConnectionId, + *serverChosenConnId, + *sock, + *stream, + *IOBuf::copyBuffer("world"), + false); + writeQuicPacket( + *conn, + *conn->clientConnectionId, + *serverChosenConnId, + *sock, + *stream, + *IOBuf::copyBuffer("this is bob"), + false); + + EXPECT_EQ(stream->retransmissionBuffer.size(), 3); + EXPECT_EQ(3, conn->outstandingPackets.size()); + + auto& streamFrame3 = + *conn->outstandingPackets[2].packet.frames[0].asWriteStreamFrame(); + + sendAckSMHandler(*stream, streamFrame3); + ASSERT_EQ(stream->sendState, StreamSendState::Open_E); + ASSERT_EQ(stream->ackedIntervals.front().start, 10); + ASSERT_EQ(stream->ackedIntervals.front().end, 21); + + auto& streamFrame2 = + *conn->outstandingPackets[1].packet.frames[0].asWriteStreamFrame(); + + sendAckSMHandler(*stream, streamFrame2); + ASSERT_EQ(stream->sendState, StreamSendState::Open_E); + ASSERT_EQ(stream->ackedIntervals.front().start, 5); + ASSERT_EQ(stream->ackedIntervals.front().end, 21); + + auto& streamFrame1 = + *conn->outstandingPackets[0].packet.frames[0].asWriteStreamFrame(); + + sendAckSMHandler(*stream, streamFrame1); + ASSERT_EQ(stream->sendState, StreamSendState::Open_E); + ASSERT_EQ(stream->ackedIntervals.front().start, 0); + ASSERT_EQ(stream->ackedIntervals.front().end, 21); +} + TEST_F(QuicOpenStateTest, RetxBufferSortedAfterAck) { auto conn = createConn(); auto stream = conn->streamManager->createNextBidirectionalStream().value(); @@ -219,9 +282,6 @@ TEST_F(QuicOpenStateTest, RetxBufferSortedAfterAck) { .asWriteStreamFrame(); sendAckSMHandler(*stream, streamFrame); EXPECT_EQ(2, stream->retransmissionBuffer.size()); - EXPECT_GT( - stream->retransmissionBuffer.back().offset, - stream->retransmissionBuffer.front().offset); } TEST_F(QuicOpenStateTest, AckStreamAfterSkip) { @@ -933,7 +993,10 @@ TEST_F(QuicUnidirectionalStreamTest, OpenFinalAckStreamFrame) { stream.currentWriteOffset = 2; auto buf = folly::IOBuf::create(1); buf->append(1); - stream.retransmissionBuffer.emplace_back(std::move(buf), 1, false); + stream.retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(1), + std::forward_as_tuple(std::move(buf), 1, false)); sendAckSMHandler(stream, streamFrame); EXPECT_EQ(stream.sendState, StreamSendState::Closed_E); EXPECT_EQ(stream.recvState, StreamRecvState::Invalid_E); diff --git a/quic/state/test/QPRFunctionsTest.cpp b/quic/state/test/QPRFunctionsTest.cpp index 705b94d36..92c52cdbb 100644 --- a/quic/state/test/QPRFunctionsTest.cpp +++ b/quic/state/test/QPRFunctionsTest.cpp @@ -92,7 +92,7 @@ TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) { auto buf = folly::IOBuf::copyBuffer("aaaaaaaaaa"); // case2. has no unacked data below 139 stream->currentWriteOffset = 150; - stream->retransmissionBuffer.emplace_back(StreamBuffer(buf->clone(), 140)); + stream->retransmissionBuffer.emplace(140, StreamBuffer(buf->clone(), 140)); result = advanceMinimumRetransmittableOffset(stream, 139); EXPECT_TRUE(result.hasValue()); EXPECT_EQ(*result, 139); @@ -101,7 +101,7 @@ TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) { // case3. ExpiredStreamDataFrame is wired stream->minimumRetransmittableOffset = 139; - stream->retransmissionBuffer.emplace_back(StreamBuffer(buf->clone(), 140)); + stream->retransmissionBuffer.emplace(140, StreamBuffer(buf->clone(), 140)); result = advanceMinimumRetransmittableOffset(stream, 150); EXPECT_TRUE(result.hasValue()); EXPECT_EQ(*result, 150); @@ -117,7 +117,7 @@ TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) { // case4. update existing pending event. stream->minimumRetransmittableOffset = 150; - stream->retransmissionBuffer.emplace_back(StreamBuffer(buf->clone(), 150)); + stream->retransmissionBuffer.emplace(150, StreamBuffer(buf->clone(), 150)); stream->conn.pendingEvents.frames.clear(); stream->conn.pendingEvents.frames.emplace_back( ExpiredStreamDataFrame(stream->id, 160)); @@ -171,8 +171,8 @@ TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameShrinkBuffer) { stream->writeBuffer.append(std::move(buf)); stream->conn.flowControlState.sumCurStreamBufferLen = 20; auto writtenBuffer = folly::IOBuf::copyBuffer("cccccccccc"); - stream->retransmissionBuffer.emplace_back( - StreamBuffer(std::move(writtenBuffer), 90, false)); + stream->retransmissionBuffer.emplace( + 90, StreamBuffer(std::move(writtenBuffer), 90, false)); MinStreamDataFrame shrinkMinStreamDataFrame( stream->id, stream->flowControlState.peerAdvertisedMaxOffset, 110); onRecvMinStreamDataFrame(stream, shrinkMinStreamDataFrame, packetNum); diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index d7f8ad5a1..e8816c025 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -1810,7 +1810,8 @@ TEST_F(QuicStreamFunctionsTest, AllBytesTillFinAckedStillRetransmitting) { StreamId id = 3; QuicStreamState stream(id, conn); stream.finalWriteOffset = 12; - stream.retransmissionBuffer.emplace_back(IOBuf::create(10), 10, false); + stream.retransmissionBuffer.emplace( + 0, StreamBuffer(IOBuf::create(10), 10, false)); EXPECT_FALSE(allBytesTillFinAcked(stream)); } @@ -1882,15 +1883,18 @@ TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliver) { TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxBuffer) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; - stream.retransmissionBuffer.emplace_back(buildRandomInputData(10), 50); - EXPECT_EQ(50, getStreamNextOffsetToDeliver(stream)); + stream.retransmissionBuffer.emplace( + 50, StreamBuffer(buildRandomInputData(10), 50)); + stream.ackedIntervals.insert(0, 49); + EXPECT_EQ(49, getStreamNextOffsetToDeliver(stream)); } TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxAndLossBuffer) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; stream.lossBuffer.emplace_back(buildRandomInputData(10), 30); - stream.retransmissionBuffer.emplace_back(buildRandomInputData(10), 50); + stream.retransmissionBuffer.emplace( + 50, StreamBuffer(buildRandomInputData(10), 50)); EXPECT_EQ(30, getStreamNextOffsetToDeliver(stream)); } @@ -1962,8 +1966,8 @@ TEST_F(QuicStreamFunctionsTest, WritableList) { TEST_F(QuicStreamFunctionsTest, AckCryptoStream) { auto chlo = IOBuf::copyBuffer("CHLO"); - conn.cryptoState->handshakeStream.retransmissionBuffer.emplace_back( - StreamBuffer(chlo->clone(), 0)); + conn.cryptoState->handshakeStream.retransmissionBuffer.emplace( + 0, StreamBuffer(chlo->clone(), 0)); processCryptoStreamAck(conn.cryptoState->handshakeStream, 0, chlo->length()); EXPECT_EQ(conn.cryptoState->handshakeStream.retransmissionBuffer.size(), 0); } @@ -1971,8 +1975,7 @@ TEST_F(QuicStreamFunctionsTest, AckCryptoStream) { TEST_F(QuicStreamFunctionsTest, AckCryptoStreamOffsetLengthMismatch) { auto chlo = IOBuf::copyBuffer("CHLO"); auto& cryptoStream = conn.cryptoState->handshakeStream; - cryptoStream.retransmissionBuffer.emplace_back( - StreamBuffer(chlo->clone(), 0)); + cryptoStream.retransmissionBuffer.emplace(0, StreamBuffer(chlo->clone(), 0)); processCryptoStreamAck(cryptoStream, 1, chlo->length()); EXPECT_EQ(cryptoStream.retransmissionBuffer.size(), 1);