diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index f8a20830d..04bf01194 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -98,11 +98,13 @@ void handleNewStreamDataWritten( stream.currentWriteOffset += frameLen; auto bufWritten = stream.writeBuffer.split(folly::to(frameLen)); stream.currentWriteOffset += frameFin ? 1 : 0; - CHECK( - stream.retransmissionBuffer.empty() || - stream.retransmissionBuffer.back().offset < originalOffset); - stream.retransmissionBuffer.emplace_back( - std::move(bufWritten), originalOffset, frameFin); + CHECK(stream.retransmissionBuffer + .emplace( + std::piecewise_construct, + std::forward_as_tuple(originalOffset), + std::forward_as_tuple( + std::move(bufWritten), originalOffset, frameFin)) + .second); } void handleRetransmissionWritten( @@ -126,17 +128,13 @@ void handleRetransmissionWritten( lossBufferIter->offset += frameLen; bufWritten = lossBufferIter->data.split(frameLen); } - stream.retransmissionBuffer.emplace( - std::upper_bound( - stream.retransmissionBuffer.begin(), - stream.retransmissionBuffer.end(), - frameOffset, - [](const auto& offsetIn, const auto& buffer) { - return offsetIn < buffer.offset; - }), - std::move(bufWritten), - frameOffset, - frameFin); + CHECK(stream.retransmissionBuffer + .emplace( + std::piecewise_construct, + std::forward_as_tuple(frameOffset), + std::forward_as_tuple( + std::move(bufWritten), frameOffset, frameFin)) + .second); } /** diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index ec9c84a10..bcf054655 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -224,7 +224,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_TRUE(conn->outstandingPackets.back().isAppLimited); EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); - auto& rt1 = stream1->retransmissionBuffer.front(); + auto& rt1 = stream1->retransmissionBuffer.at(0); EXPECT_EQ(stream1->currentWriteOffset, 5); EXPECT_EQ(stream2->currentWriteOffset, 13); @@ -234,7 +234,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt1.data.front())); EXPECT_EQ(stream2->retransmissionBuffer.size(), 1); - auto& rt2 = stream2->retransmissionBuffer.front(); + auto& rt2 = stream2->retransmissionBuffer.at(0); EXPECT_EQ(rt2.offset, 0); EXPECT_TRUE(eq(*buf, *rt2.data.front())); @@ -244,9 +244,9 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { // Testing retransmission stream1->lossBuffer.push_back(std::move(rt1)); - stream1->retransmissionBuffer.pop_front(); + stream1->retransmissionBuffer.clear(); stream2->lossBuffer.push_back(std::move(rt2)); - stream2->retransmissionBuffer.pop_front(); + stream2->retransmissionBuffer.clear(); conn->streamManager->addLoss(stream1->id); conn->streamManager->addLoss(stream2->id); @@ -285,17 +285,17 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_EQ(stream1->lossBuffer.size(), 0); EXPECT_EQ(stream1->retransmissionBuffer.size(), 2); - auto& rt3 = stream1->retransmissionBuffer.back(); + auto& rt3 = stream1->retransmissionBuffer.at(5); EXPECT_TRUE(eq(IOBuf::copyBuffer("hats up"), rt3.data.move())); - auto& rt4 = stream1->retransmissionBuffer.front(); + auto& rt4 = stream1->retransmissionBuffer.at(0); EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt4.data.front())); // loss buffer should be split into 2. Part in retransmission buffer and // part remains in loss buffer. EXPECT_EQ(stream2->lossBuffer.size(), 1); EXPECT_EQ(stream2->retransmissionBuffer.size(), 1); - auto& rt5 = stream2->retransmissionBuffer.front(); + auto& rt5 = stream2->retransmissionBuffer.at(0); EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey wh"), *rt5.data.front())); EXPECT_EQ(rt5.offset, 0); EXPECT_EQ(rt5.eof, 0); @@ -434,7 +434,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionFinOnly) { EXPECT_TRUE(frame->fin); EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); - auto& rt1 = stream1->retransmissionBuffer.front(); + auto& rt1 = stream1->retransmissionBuffer.at(0); EXPECT_EQ(stream1->currentWriteOffset, 1); EXPECT_EQ(rt1.offset, 0); @@ -481,7 +481,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionAllBytesExceptFin) { EXPECT_EQ(stream1->currentWriteOffset, buf->computeChainDataLength()); EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); - auto& rt1 = stream1->retransmissionBuffer.front(); + auto& rt1 = stream1->retransmissionBuffer.at(0); EXPECT_EQ(rt1.offset, 0); EXPECT_EQ( rt1.data.front()->computeChainDataLength(), @@ -1217,9 +1217,6 @@ TEST_F(QuicTransportFunctionsTest, WriteQuicDataToSocketRetxBufferSorted) { getVersion(*conn), conn->transportSettings.writeConnectionDataPacketsLimit); EXPECT_EQ(2, stream->retransmissionBuffer.size()); - EXPECT_GT( - stream->retransmissionBuffer.back().offset, - stream->retransmissionBuffer.front().offset); } TEST_F(QuicTransportFunctionsTest, NothingWritten) { diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 286b0a5f3..d2b0e9934 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -242,22 +242,17 @@ void dropPackets(QuicServerConnectionState& conn) { } auto stream = conn.streamManager->findStream(streamFrame->streamId); ASSERT_TRUE(stream); - auto itr = std::find_if( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - [&streamFrame](const auto& buffer) { - return streamFrame->offset == buffer.offset; - }); + auto itr = stream->retransmissionBuffer.find(streamFrame->offset); ASSERT_TRUE(itr != stream->retransmissionBuffer.end()); stream->lossBuffer.insert( std::upper_bound( stream->lossBuffer.begin(), stream->lossBuffer.end(), - itr->offset, + itr->second.offset, [](const auto& offset, const auto& buffer) { return offset < buffer.offset; }), - std::move(*itr)); + std::move(itr->second)); stream->retransmissionBuffer.erase(itr); if (std::find( conn.streamManager->lossStreams().begin(), @@ -313,15 +308,24 @@ void verifyCorrectness( // Verify retransmissionBuffer: EXPECT_FALSE(stream->retransmissionBuffer.empty()); IOBufQueue retxBufCombined; - for (auto& retxBuf : stream->retransmissionBuffer) { - retxBufCombined.append(retxBuf.data.front()->clone()); + std::vector rtxCopy; + for (auto& itr : stream->retransmissionBuffer) { + rtxCopy.push_back(StreamBuffer( + itr.second.data.front()->clone(), itr.second.offset, itr.second.eof)); + } + std::sort(rtxCopy.begin(), rtxCopy.end(), [](auto& s1, auto& s2) { + return s1.offset < s2.offset; + }); + for (auto& s : rtxCopy) { + retxBufCombined.append(s.data.move()); } EXPECT_TRUE(IOBufEqualTo()(expected, *retxBufCombined.move())); - EXPECT_EQ(finExpected, stream->retransmissionBuffer.back().eof); + EXPECT_EQ(finExpected, stream->retransmissionBuffer.at(offsets.back()).eof); std::vector retxBufOffsets; for (const auto& b : stream->retransmissionBuffer) { - retxBufOffsets.push_back(b.offset); + retxBufOffsets.push_back(b.second.offset); } + std::sort(retxBufOffsets.begin(), retxBufOffsets.end()); EXPECT_EQ(offsets, retxBufOffsets); } @@ -1196,9 +1200,8 @@ TEST_F(QuicTransportTest, CloneAfterRecvReset) { EXPECT_EQ(1, conn.outstandingPackets.size()); auto stream = conn.streamManager->getStream(streamId); EXPECT_EQ(1, stream->retransmissionBuffer.size()); - EXPECT_EQ(0, stream->retransmissionBuffer.back().offset); - EXPECT_EQ(0, stream->retransmissionBuffer.back().data.chainLength()); - EXPECT_TRUE(stream->retransmissionBuffer.back().eof); + EXPECT_EQ(0, stream->retransmissionBuffer.at(0).data.chainLength()); + EXPECT_TRUE(stream->retransmissionBuffer.at(0).eof); EXPECT_TRUE(stream->lossBuffer.empty()); EXPECT_EQ(0, stream->writeBuffer.chainLength()); EXPECT_EQ(1, stream->currentWriteOffset); @@ -1927,8 +1930,11 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) { conn.lossState.srtt = 100us; auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); - streamState->retransmissionBuffer.emplace_back( - folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false); + streamState->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(51), + std::forward_as_tuple( + folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false)); folly::SocketAddress addr; NetworkData emptyData; @@ -1970,8 +1976,11 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) { auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); streamState->lossBuffer.clear(); - streamState->retransmissionBuffer.emplace_back( - folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false); + streamState->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(51), + std::forward_as_tuple( + folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false)); streamState->lossBuffer.emplace_back( folly::IOBuf::copyBuffer("And I'm lost"), 31, false); diff --git a/quic/codec/QuicPacketRebuilder.cpp b/quic/codec/QuicPacketRebuilder.cpp index 64f09978f..c2c99f0be 100644 --- a/quic/codec/QuicPacketRebuilder.cpp +++ b/quic/codec/QuicPacketRebuilder.cpp @@ -200,23 +200,17 @@ Buf PacketRebuilder::cloneCryptoRetransmissionBuffer( * lost packet. */ DCHECK(frame.len) << "WriteCryptoFrame cloning: frame is empty. " << conn_; - auto iter = std::lower_bound( - stream.retransmissionBuffer.begin(), - stream.retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& targetOffset) { - return buffer.offset < targetOffset; - }); + auto iter = stream.retransmissionBuffer.find(frame.offset); // If the crypto stream is canceled somehow, just skip cloning this frame if (iter == stream.retransmissionBuffer.end()) { return nullptr; } - DCHECK(iter->offset == frame.offset) + DCHECK(iter->second.offset == frame.offset) << "WriteCryptoFrame cloning: offset mismatch. " << conn_; - DCHECK(iter->data.chainLength() == frame.len) + DCHECK(iter->second.data.chainLength() == frame.len) << "WriteCryptoFrame cloning: Len mismatch. " << conn_; - return iter->data.front()->clone(); + return iter->second.data.front()->clone(); } Buf PacketRebuilder::cloneRetransmissionBuffer( @@ -236,19 +230,13 @@ Buf PacketRebuilder::cloneRetransmissionBuffer( */ DCHECK(stream); DCHECK(retransmittable(*stream)); - auto iter = std::lower_bound( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& targetOffset) { - return buffer.offset < targetOffset; - }); + auto iter = stream->retransmissionBuffer.find(frame.offset); if (iter != stream->retransmissionBuffer.end()) { - if (streamFrameMatchesRetransmitBuffer(*stream, frame, *iter)) { - DCHECK(!frame.len || !iter->data.empty()) + if (streamFrameMatchesRetransmitBuffer(*stream, frame, iter->second)) { + DCHECK(!frame.len || !iter->second.data.empty()) << "WriteStreamFrame cloning: frame is not empty but StreamBuffer has" << " empty data. " << conn_; - return (frame.len ? iter->data.front()->clone() : nullptr); + return (frame.len ? iter->second.data.front()->clone() : nullptr); } } return nullptr; diff --git a/quic/codec/test/QuicPacketRebuilderTest.cpp b/quic/codec/test/QuicPacketRebuilderTest.cpp index 1673d7e01..93c45053f 100644 --- a/quic/codec/test/QuicPacketRebuilderTest.cpp +++ b/quic/codec/test/QuicPacketRebuilderTest.cpp @@ -99,12 +99,13 @@ TEST_F(QuicPacketRebuilderTest, RebuildPacket) { auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(8, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), buf->clone(), 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(buf->clone(), 0, true)); conn.cryptoState->oneRttStream.retransmissionBuffer.emplace( - conn.cryptoState->oneRttStream.retransmissionBuffer.begin(), - cryptoBuf->clone(), - 0, - true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(cryptoBuf->clone(), 0, true)); // rebuild a packet from the built out packet ShortHeader shortHeader2( @@ -240,7 +241,9 @@ TEST_F(QuicPacketRebuilderTest, FinOnlyStreamRebuild) { writeStreamFrameHeader(regularBuilder1, streamId, 0, 0, 0, true); auto packet1 = std::move(regularBuilder1).buildPacket(); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), nullptr, 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(nullptr, 0, true)); // rebuild a packet from the built out packet ShortHeader shortHeader2( @@ -294,7 +297,9 @@ TEST_F(QuicPacketRebuilderTest, RebuildDataStreamAndEmptyCryptoStream) { auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(2, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), buf->clone(), 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(buf->clone(), 0, true)); // Do not add the buf to crypto stream's retransmission buffer, // imagine it was cleared @@ -390,7 +395,9 @@ TEST_F(QuicPacketRebuilderTest, CannotRebuild) { auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(5, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( - stream->retransmissionBuffer.begin(), buf->clone(), 0, true); + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(buf->clone(), 0, true)); // new builder has a much smaller writable bytes limit ShortHeader shortHeader2( diff --git a/quic/loss/QuicLossFunctions.cpp b/quic/loss/QuicLossFunctions.cpp index 665a29c4a..8d57e810d 100644 --- a/quic/loss/QuicLossFunctions.cpp +++ b/quic/loss/QuicLossFunctions.cpp @@ -96,13 +96,7 @@ void markPacketLoss( if (!stream) { break; } - auto bufferItr = std::lower_bound( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); + auto bufferItr = stream->retransmissionBuffer.find(frame.offset); if (bufferItr == stream->retransmissionBuffer.end()) { // It's possible that the stream was reset or data on the stream was // skipped while we discovered that its packet was lost so we might @@ -111,18 +105,19 @@ void markPacketLoss( } // The original rxmt offset might have been bumped up after it was // shrunk due to egress partially reliable skip. - if (!streamFrameMatchesRetransmitBuffer(*stream, frame, *bufferItr)) { + if (!streamFrameMatchesRetransmitBuffer( + *stream, frame, bufferItr->second)) { break; } stream->lossBuffer.insert( std::upper_bound( stream->lossBuffer.begin(), stream->lossBuffer.end(), - bufferItr->offset, + bufferItr->second.offset, [](const auto& offset, const auto& buffer) { return offset < buffer.offset; }), - std::move(*bufferItr)); + std::move(bufferItr->second)); stream->retransmissionBuffer.erase(bufferItr); conn.streamManager->updateLossStreams(*stream); break; @@ -136,28 +131,22 @@ void markPacketLoss( auto encryptionLevel = protectionTypeToEncryptionLevel(protectionType); auto cryptoStream = getCryptoStream(*conn.cryptoState, encryptionLevel); - auto bufferItr = std::lower_bound( - cryptoStream->retransmissionBuffer.begin(), - cryptoStream->retransmissionBuffer.end(), - frame.offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); + auto bufferItr = cryptoStream->retransmissionBuffer.find(frame.offset); if (bufferItr == cryptoStream->retransmissionBuffer.end()) { // It's possible that the stream was reset while we discovered that // it's packet was lost so we might not have the offset. break; } - DCHECK_EQ(bufferItr->offset, frame.offset); + DCHECK_EQ(bufferItr->second.offset, frame.offset); cryptoStream->lossBuffer.insert( std::upper_bound( cryptoStream->lossBuffer.begin(), cryptoStream->lossBuffer.end(), - bufferItr->offset, + bufferItr->second.offset, [](const auto& offset, const auto& buffer) { return offset < buffer.offset; }), - std::move(*bufferItr)); + std::move(bufferItr->second)); cryptoStream->retransmissionBuffer.erase(bufferItr); break; } diff --git a/quic/loss/test/QuicLossFunctionsTest.cpp b/quic/loss/test/QuicLossFunctionsTest.cpp index 5c68369a6..693f6b6b2 100644 --- a/quic/loss/test/QuicLossFunctionsTest.cpp +++ b/quic/loss/test/QuicLossFunctionsTest.cpp @@ -426,9 +426,6 @@ TEST_F(QuicLossFunctionsTest, RetxBufferSortedAfterLoss) { markPacketLoss( *conn, packet.packet, false, packet.packet.header.getPacketSequenceNum()); EXPECT_EQ(2, stream->retransmissionBuffer.size()); - EXPECT_GT( - stream->retransmissionBuffer.back().offset, - stream->retransmissionBuffer.front().offset); } TEST_F(QuicLossFunctionsTest, TestMarkCryptoLostAfterCancelRetransmission) { diff --git a/quic/server/test/QuicServerTransportTest.cpp b/quic/server/test/QuicServerTransportTest.cpp index fbdd21154..5d2873069 100644 --- a/quic/server/test/QuicServerTransportTest.cpp +++ b/quic/server/test/QuicServerTransportTest.cpp @@ -1179,12 +1179,7 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) { if (!frame) { continue; } - auto it = std::find_if( - stream->retransmissionBuffer.begin(), - stream->retransmissionBuffer.end(), - [&](auto& buffer) { - return buffer.offset == frame->offset && buffer.eof == frame->fin; - }); + auto it = stream->retransmissionBuffer.find(frame->offset); ASSERT_TRUE(it != stream->retransmissionBuffer.end()); if (currentPacket == packetNum1 && frame->streamId == streamId) { buffersInPacket1++; @@ -1358,8 +1353,10 @@ TEST_F(QuicServerTransportTest, RecvRstStreamFrame) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); writeDataToQuicStream(*stream, IOBuf::copyBuffer(words.at(3)), false); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1413,8 +1410,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrame) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1455,8 +1454,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterCloseStream) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1496,8 +1497,10 @@ TEST_F(QuicServerTransportTest, RecvInvalidMaxStreamData) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1534,8 +1537,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterHalfCloseRemote) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1606,8 +1611,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterReset) { stream1->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream1->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream1->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream1->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream1->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream1->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream1->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1615,8 +1622,10 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterReset) { stream2->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream2->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); - stream2->retransmissionBuffer.emplace_back( - IOBuf::copyBuffer(words.at(2)), 0, false); + stream2->retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(IOBuf::copyBuffer(words.at(2)), 0, false)); stream2->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream2->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream2->currentReadOffset = words.at(0).length() + words.at(1).length(); diff --git a/quic/state/QPRFunctions.cpp b/quic/state/QPRFunctions.cpp index 706dfd162..a295bbd4f 100644 --- a/quic/state/QPRFunctions.cpp +++ b/quic/state/QPRFunctions.cpp @@ -34,6 +34,29 @@ void shrinkBuffers(std::deque& buffers, uint64_t offset) { } } +void shrinkBuffers( + folly::F14FastMap& buffers, + uint64_t offset) { + // Do a linear search of the entire buffer, there can be exactly one trimmed + // buffer, since we are changing the offset for that single buffer we need to + // change the offset in the StreamBuffer, but keep it keyed on the same + // offset as before so we still remove it on ack. + for (auto itr = buffers.begin(); itr != buffers.end();) { + if (itr->second.offset >= offset) { + itr++; + continue; + } + if (itr->second.offset + itr->second.data.chainLength() <= offset) { + itr = buffers.erase(itr); + } else { + uint64_t amount = offset - itr->second.offset; + itr->second.data.trimStartAtMost(amount); + itr->second.offset += amount; + itr++; + } + } +} + void shrinkRetransmittableBuffers( QuicStreamState* stream, uint64_t minimumRetransmittableOffset) { diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index 4664ec038..6d33adc9d 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -382,11 +382,14 @@ uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) { uint64_t getStreamNextOffsetToDeliver(const QuicStreamState& stream) { auto minOffsetToDeliver = stream.currentWriteOffset; + // If the acked intervals is not empty, then the furthest acked interval + // starting at zero is the next offset. If there is no interval starting at + // zero then we cannot deliver any offsets. minOffsetToDeliver = std::min( minOffsetToDeliver, - stream.retransmissionBuffer.empty() + stream.ackedIntervals.empty() || stream.ackedIntervals.front().start != 0 ? minOffsetToDeliver - : stream.retransmissionBuffer[0].offset); + : stream.ackedIntervals.front().end); minOffsetToDeliver = std::min( minOffsetToDeliver, stream.lossBuffer.empty() ? minOffsetToDeliver @@ -426,16 +429,10 @@ void processCryptoStreamAck( QuicCryptoStream& cryptoStream, uint64_t offset, uint64_t len) { - auto ackedBuffer = std::lower_bound( - cryptoStream.retransmissionBuffer.begin(), - cryptoStream.retransmissionBuffer.end(), - offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); - + auto ackedBuffer = cryptoStream.retransmissionBuffer.find(offset); if (ackedBuffer == cryptoStream.retransmissionBuffer.end() || - ackedBuffer->offset != offset || ackedBuffer->data.chainLength() != len) { + ackedBuffer->second.offset != offset || + ackedBuffer->second.data.chainLength() != len) { // It's possible retransmissions of crypto data were canceled. return; } diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 8d63ae4f2..820fe27c5 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -36,12 +36,22 @@ struct QuicStreamLike { // TODO replace with BufQueue folly::IOBufQueue writeBuffer{folly::IOBufQueue::cacheChainLength()}; - // Stores a list of buffers which have been written to the socket and are + // Stores a map of buffers which have been written to the socket and are // currently un-acked. Each one represents one StreamFrame that was written. // We need to buffer these because these might be retransmitted // in the future. - // These are sorted in order of start offset. - std::deque retransmissionBuffer; + // These are associated with the starting offset of the buffer. + // Note: the offset in the StreamBuffer itself can be >= the offset on which + // it is keyed due to partial reliability - when data is skipped the offset + // in the StreamBuffer may be incremented, but the keyed offset must remain + // the same so it can be removed from the buffer on ACK. + folly::F14FastMap retransmissionBuffer; + + // Tracks intervals which we have received ACKs for. E.g. in the case of all + // data being acked this would contain one internval from 0 -> the largest + // offseet ACKed. This allows us to track which delivery callbacks can be + // called. + IntervalSet ackedIntervals; // Stores a list of buffers which have been marked as loss by loss detector. // Each one represents one StreamFrame that was written. diff --git a/quic/state/stream/StreamSendHandlers.cpp b/quic/state/stream/StreamSendHandlers.cpp index a004ffbb4..83e4ee6b4 100644 --- a/quic/state/stream/StreamSendHandlers.cpp +++ b/quic/state/stream/StreamSendHandlers.cpp @@ -91,28 +91,25 @@ void sendAckSMHandler( switch (stream.sendState) { case StreamSendState::Open_E: { // Clean up the acked buffers from the retransmissionBuffer. - auto ackedBuffer = std::lower_bound( - stream.retransmissionBuffer.begin(), - stream.retransmissionBuffer.end(), - ackedFrame.offset, - [](const auto& buffer, const auto& offset) { - return buffer.offset < offset; - }); - + auto ackedBuffer = stream.retransmissionBuffer.find(ackedFrame.offset); if (ackedBuffer != stream.retransmissionBuffer.end()) { if (streamFrameMatchesRetransmitBuffer( - stream, ackedFrame, *ackedBuffer)) { + stream, ackedFrame, ackedBuffer->second)) { VLOG(10) << "Open: acked stream data stream=" << stream.id - << " offset=" << ackedBuffer->offset - << " len=" << ackedBuffer->data.chainLength() - << " eof=" << ackedBuffer->eof << " " << stream.conn; + << " offset=" << ackedBuffer->second.offset + << " len=" << ackedBuffer->second.data.chainLength() + << " eof=" << ackedBuffer->second.eof << " " << stream.conn; + stream.ackedIntervals.insert( + ackedBuffer->second.offset, + ackedBuffer->second.offset + + ackedBuffer->second.data.chainLength()); stream.retransmissionBuffer.erase(ackedBuffer); } else { VLOG(10) << "Open: received an ack for already discarded buffer; stream=" - << stream.id << " offset=" << ackedBuffer->offset - << " len=" << ackedBuffer->data.chainLength() - << " eof=" << ackedBuffer->eof << " " << stream.conn; + << stream.id << " offset=" << ackedBuffer->second.offset + << " len=" << ackedBuffer->second.data.chainLength() + << " eof=" << ackedBuffer->second.eof << " " << stream.conn; } } diff --git a/quic/state/stream/test/StreamStateFunctionsTest.cpp b/quic/state/stream/test/StreamStateFunctionsTest.cpp index 74956b21f..b3e5ac906 100644 --- a/quic/state/stream/test/StreamStateFunctionsTest.cpp +++ b/quic/state/stream/test/StreamStateFunctionsTest.cpp @@ -32,8 +32,8 @@ TEST_F(StreamStateFunctionsTests, BasicResetTest) { StreamBuffer(folly::IOBuf::copyBuffer(" It is not a hotdog."), 15)); writeDataToQuicStream( stream, folly::IOBuf::copyBuffer("What is it then?"), false); - stream.retransmissionBuffer.emplace_back( - folly::IOBuf::copyBuffer("How would I know?"), 34); + stream.retransmissionBuffer.emplace( + 34, StreamBuffer(folly::IOBuf::copyBuffer("How would I know?"), 34)); auto currentWriteOffset = stream.currentWriteOffset; auto currentReadOffset = stream.currentReadOffset; EXPECT_TRUE(stream.writable()); diff --git a/quic/state/stream/test/StreamStateMachineTest.cpp b/quic/state/stream/test/StreamStateMachineTest.cpp index 4e136d0e3..6e5583ce2 100644 --- a/quic/state/stream/test/StreamStateMachineTest.cpp +++ b/quic/state/stream/test/StreamStateMachineTest.cpp @@ -175,6 +175,69 @@ TEST_F(QuicOpenStateTest, AckStream) { ASSERT_EQ(stream->sendState, StreamSendState::Closed_E); } +TEST_F(QuicOpenStateTest, AckStreamMulti) { + auto conn = createConn(); + + auto stream = conn->streamManager->createNextBidirectionalStream().value(); + folly::Optional serverChosenConnId = *conn->clientConnectionId; + serverChosenConnId.value().data()[0] ^= 0x01; + EventBase evb; + auto sock = std::make_unique(&evb); + + auto buf = IOBuf::copyBuffer("hello"); + writeQuicPacket( + *conn, + *conn->clientConnectionId, + *serverChosenConnId, + *sock, + *stream, + *buf, + false); + writeQuicPacket( + *conn, + *conn->clientConnectionId, + *serverChosenConnId, + *sock, + *stream, + *IOBuf::copyBuffer("world"), + false); + writeQuicPacket( + *conn, + *conn->clientConnectionId, + *serverChosenConnId, + *sock, + *stream, + *IOBuf::copyBuffer("this is bob"), + false); + + EXPECT_EQ(stream->retransmissionBuffer.size(), 3); + EXPECT_EQ(3, conn->outstandingPackets.size()); + + auto& streamFrame3 = + *conn->outstandingPackets[2].packet.frames[0].asWriteStreamFrame(); + + sendAckSMHandler(*stream, streamFrame3); + ASSERT_EQ(stream->sendState, StreamSendState::Open_E); + ASSERT_EQ(stream->ackedIntervals.front().start, 10); + ASSERT_EQ(stream->ackedIntervals.front().end, 21); + + auto& streamFrame2 = + *conn->outstandingPackets[1].packet.frames[0].asWriteStreamFrame(); + + sendAckSMHandler(*stream, streamFrame2); + ASSERT_EQ(stream->sendState, StreamSendState::Open_E); + ASSERT_EQ(stream->ackedIntervals.front().start, 5); + ASSERT_EQ(stream->ackedIntervals.front().end, 21); + + auto& streamFrame1 = + *conn->outstandingPackets[0].packet.frames[0].asWriteStreamFrame(); + + sendAckSMHandler(*stream, streamFrame1); + ASSERT_EQ(stream->sendState, StreamSendState::Open_E); + ASSERT_EQ(stream->ackedIntervals.front().start, 0); + ASSERT_EQ(stream->ackedIntervals.front().end, 21); +} + TEST_F(QuicOpenStateTest, RetxBufferSortedAfterAck) { auto conn = createConn(); auto stream = conn->streamManager->createNextBidirectionalStream().value(); @@ -219,9 +282,6 @@ TEST_F(QuicOpenStateTest, RetxBufferSortedAfterAck) { .asWriteStreamFrame(); sendAckSMHandler(*stream, streamFrame); EXPECT_EQ(2, stream->retransmissionBuffer.size()); - EXPECT_GT( - stream->retransmissionBuffer.back().offset, - stream->retransmissionBuffer.front().offset); } TEST_F(QuicOpenStateTest, AckStreamAfterSkip) { @@ -933,7 +993,10 @@ TEST_F(QuicUnidirectionalStreamTest, OpenFinalAckStreamFrame) { stream.currentWriteOffset = 2; auto buf = folly::IOBuf::create(1); buf->append(1); - stream.retransmissionBuffer.emplace_back(std::move(buf), 1, false); + stream.retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(1), + std::forward_as_tuple(std::move(buf), 1, false)); sendAckSMHandler(stream, streamFrame); EXPECT_EQ(stream.sendState, StreamSendState::Closed_E); EXPECT_EQ(stream.recvState, StreamRecvState::Invalid_E); diff --git a/quic/state/test/QPRFunctionsTest.cpp b/quic/state/test/QPRFunctionsTest.cpp index 705b94d36..92c52cdbb 100644 --- a/quic/state/test/QPRFunctionsTest.cpp +++ b/quic/state/test/QPRFunctionsTest.cpp @@ -92,7 +92,7 @@ TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) { auto buf = folly::IOBuf::copyBuffer("aaaaaaaaaa"); // case2. has no unacked data below 139 stream->currentWriteOffset = 150; - stream->retransmissionBuffer.emplace_back(StreamBuffer(buf->clone(), 140)); + stream->retransmissionBuffer.emplace(140, StreamBuffer(buf->clone(), 140)); result = advanceMinimumRetransmittableOffset(stream, 139); EXPECT_TRUE(result.hasValue()); EXPECT_EQ(*result, 139); @@ -101,7 +101,7 @@ TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) { // case3. ExpiredStreamDataFrame is wired stream->minimumRetransmittableOffset = 139; - stream->retransmissionBuffer.emplace_back(StreamBuffer(buf->clone(), 140)); + stream->retransmissionBuffer.emplace(140, StreamBuffer(buf->clone(), 140)); result = advanceMinimumRetransmittableOffset(stream, 150); EXPECT_TRUE(result.hasValue()); EXPECT_EQ(*result, 150); @@ -117,7 +117,7 @@ TEST_F(QPRFunctionsTest, AdvanceMinimumRetransmittableOffset) { // case4. update existing pending event. stream->minimumRetransmittableOffset = 150; - stream->retransmissionBuffer.emplace_back(StreamBuffer(buf->clone(), 150)); + stream->retransmissionBuffer.emplace(150, StreamBuffer(buf->clone(), 150)); stream->conn.pendingEvents.frames.clear(); stream->conn.pendingEvents.frames.emplace_back( ExpiredStreamDataFrame(stream->id, 160)); @@ -171,8 +171,8 @@ TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameShrinkBuffer) { stream->writeBuffer.append(std::move(buf)); stream->conn.flowControlState.sumCurStreamBufferLen = 20; auto writtenBuffer = folly::IOBuf::copyBuffer("cccccccccc"); - stream->retransmissionBuffer.emplace_back( - StreamBuffer(std::move(writtenBuffer), 90, false)); + stream->retransmissionBuffer.emplace( + 90, StreamBuffer(std::move(writtenBuffer), 90, false)); MinStreamDataFrame shrinkMinStreamDataFrame( stream->id, stream->flowControlState.peerAdvertisedMaxOffset, 110); onRecvMinStreamDataFrame(stream, shrinkMinStreamDataFrame, packetNum); diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index d7f8ad5a1..e8816c025 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -1810,7 +1810,8 @@ TEST_F(QuicStreamFunctionsTest, AllBytesTillFinAckedStillRetransmitting) { StreamId id = 3; QuicStreamState stream(id, conn); stream.finalWriteOffset = 12; - stream.retransmissionBuffer.emplace_back(IOBuf::create(10), 10, false); + stream.retransmissionBuffer.emplace( + 0, StreamBuffer(IOBuf::create(10), 10, false)); EXPECT_FALSE(allBytesTillFinAcked(stream)); } @@ -1882,15 +1883,18 @@ TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliver) { TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxBuffer) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; - stream.retransmissionBuffer.emplace_back(buildRandomInputData(10), 50); - EXPECT_EQ(50, getStreamNextOffsetToDeliver(stream)); + stream.retransmissionBuffer.emplace( + 50, StreamBuffer(buildRandomInputData(10), 50)); + stream.ackedIntervals.insert(0, 49); + EXPECT_EQ(49, getStreamNextOffsetToDeliver(stream)); } TEST_F(QuicStreamFunctionsTest, StreamNextOffsetToDeliverRetxAndLossBuffer) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; stream.lossBuffer.emplace_back(buildRandomInputData(10), 30); - stream.retransmissionBuffer.emplace_back(buildRandomInputData(10), 50); + stream.retransmissionBuffer.emplace( + 50, StreamBuffer(buildRandomInputData(10), 50)); EXPECT_EQ(30, getStreamNextOffsetToDeliver(stream)); } @@ -1962,8 +1966,8 @@ TEST_F(QuicStreamFunctionsTest, WritableList) { TEST_F(QuicStreamFunctionsTest, AckCryptoStream) { auto chlo = IOBuf::copyBuffer("CHLO"); - conn.cryptoState->handshakeStream.retransmissionBuffer.emplace_back( - StreamBuffer(chlo->clone(), 0)); + conn.cryptoState->handshakeStream.retransmissionBuffer.emplace( + 0, StreamBuffer(chlo->clone(), 0)); processCryptoStreamAck(conn.cryptoState->handshakeStream, 0, chlo->length()); EXPECT_EQ(conn.cryptoState->handshakeStream.retransmissionBuffer.size(), 0); } @@ -1971,8 +1975,7 @@ TEST_F(QuicStreamFunctionsTest, AckCryptoStream) { TEST_F(QuicStreamFunctionsTest, AckCryptoStreamOffsetLengthMismatch) { auto chlo = IOBuf::copyBuffer("CHLO"); auto& cryptoStream = conn.cryptoState->handshakeStream; - cryptoStream.retransmissionBuffer.emplace_back( - StreamBuffer(chlo->clone(), 0)); + cryptoStream.retransmissionBuffer.emplace(0, StreamBuffer(chlo->clone(), 0)); processCryptoStreamAck(cryptoStream, 1, chlo->length()); EXPECT_EQ(cryptoStream.retransmissionBuffer.size(), 1);