From bc386475e5c375c3cf4f6379b314ae6fa2571623 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Thu, 15 Aug 2024 05:46:08 -0700 Subject: [PATCH] Integrate RangeChain into write path of QUIC stack Summary: See title Reviewed By: mjoras Differential Revision: D58216871 fbshipit-source-id: 9afc08946a676ec967c998416a6470d4884af550 --- quic/api/BUCK | 4 + quic/api/QuicPacketScheduler.cpp | 10 +- quic/api/QuicTransportBase.cpp | 6 +- quic/api/QuicTransportFunctions.cpp | 43 ++++--- quic/api/QuicTransportFunctions.h | 2 +- quic/api/test/BUCK | 1 + quic/api/test/QuicPacketSchedulerTest.cpp | 50 ++++---- quic/api/test/QuicTransportBaseTest.cpp | 24 +++- quic/api/test/QuicTransportFunctionsTest.cpp | 112 +++++++++++------- quic/api/test/QuicTransportTest.cpp | 32 ++--- quic/codec/QuicPacketRebuilder.cpp | 4 +- quic/codec/QuicPacketRebuilder.h | 4 +- quic/codec/QuicWriteCodec.cpp | 11 +- quic/codec/QuicWriteCodec.h | 7 +- quic/codec/test/QuicPacketBuilderTest.cpp | 4 +- quic/codec/test/QuicPacketRebuilderTest.cpp | 60 +++++----- quic/codec/test/QuicReadCodecTest.cpp | 15 ++- quic/common/ChainedByteRange.cpp | 15 +++ quic/common/ChainedByteRange.h | 13 +- quic/common/test/TestUtils.cpp | 14 +-- quic/common/test/TestUtils.h | 4 +- quic/dsr/backend/test/DSRPacketizerTest.cpp | 1 + quic/dsr/frontend/Scheduler.cpp | 8 +- quic/dsr/frontend/test/WriteFunctionsTest.cpp | 19 +-- .../client/test/QuicClientTransportTest.cpp | 17 ++- .../client/test/QuicClientTransportTestUtil.h | 6 +- quic/flowcontrol/QuicFlowController.cpp | 10 +- quic/flowcontrol/test/QuicFlowControlTest.cpp | 13 +- quic/loss/test/QuicLossFunctionsTest.cpp | 73 +++++++++--- quic/server/QuicServerTransport.cpp | 5 +- quic/server/test/QuicServerTransportTest.cpp | 46 ++++--- .../server/test/QuicServerTransportTestUtil.h | 6 +- quic/state/QuicStreamFunctions.cpp | 13 +- quic/state/StreamData.h | 53 ++++++--- quic/state/stream/StreamSendHandlers.cpp | 2 +- quic/state/stream/StreamStateFunctions.cpp | 1 + .../stream/test/StreamStateFunctionsTest.cpp | 6 +- .../stream/test/StreamStateMachineTest.cpp | 6 +- quic/state/test/AckHandlersTest.cpp | 28 +++-- quic/state/test/QuicStreamFunctionsTest.cpp | 24 ++-- 40 files changed, 496 insertions(+), 276 deletions(-) diff --git a/quic/api/BUCK b/quic/api/BUCK index 703eedba8..6d4bc1609 100644 --- a/quic/api/BUCK +++ b/quic/api/BUCK @@ -14,6 +14,9 @@ mvfst_cpp_library( "QuicBatchWriterFactory.h", "QuicGsoBatchWriters.h", ], + deps = [ + "//quic/common:buf_accessor", + ], exported_deps = [ "//folly:network_address", "//folly:portability", @@ -45,6 +48,7 @@ mvfst_cpp_library( "//folly:chrono", "//folly:scope_guard", "//folly/tracing:static_tracepoint", + "//quic/common:buf_accessor", "//quic/common:socket_util", "//quic/common:time_util", "//quic/congestion_control:ecn_l4s_tracker", diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index cf21913da..3de0d8715 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -538,7 +538,7 @@ bool StreamFrameScheduler::writeStreamFrame( uint64_t flowControlLen = std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes); - uint64_t bufferLen = stream.writeBuffer.chainLength(); + uint64_t bufferLen = stream.pendingWrites.chainLength(); // We should never write a FIN from the non-DSR scheduler for a DSR stream. bool canWriteFin = stream.finalWriteOffset.has_value() && bufferLen <= flowControlLen && stream.writeBufMeta.offset == 0; @@ -555,7 +555,7 @@ bool StreamFrameScheduler::writeStreamFrame( if (!dataLen) { return false; } - writeStreamFrameData(builder, stream.writeBuffer, *dataLen); + writeStreamFrameData(builder, stream.pendingWrites, *dataLen); VLOG(4) << "Wrote stream frame stream=" << stream.id << " offset=" << stream.currentWriteOffset << " bytesWritten=" << *dataLen @@ -803,7 +803,7 @@ CryptoStreamScheduler::CryptoStreamScheduler( bool CryptoStreamScheduler::writeCryptoData(PacketBuilderInterface& builder) { bool cryptoDataWritten = false; uint64_t writableData = - folly::to(cryptoStream_.writeBuffer.chainLength()); + folly::to(cryptoStream_.pendingWrites.chainLength()); // We use the crypto scheduler to reschedule the retransmissions of the // crypto streams so that we know that retransmissions of the crypto data // will always take precedence over the crypto data. @@ -819,7 +819,7 @@ bool CryptoStreamScheduler::writeCryptoData(PacketBuilderInterface& builder) { if (writableData != 0) { auto res = writeCryptoFrame( - cryptoStream_.currentWriteOffset, cryptoStream_.writeBuffer, builder); + cryptoStream_.currentWriteOffset, cryptoStream_.pendingWrites, builder); if (res) { VLOG(4) << "Wrote crypto frame" << " offset=" << cryptoStream_.currentWriteOffset @@ -831,7 +831,7 @@ bool CryptoStreamScheduler::writeCryptoData(PacketBuilderInterface& builder) { } bool CryptoStreamScheduler::hasData() const { - return !cryptoStream_.writeBuffer.empty() || + return !cryptoStream_.pendingWrites.empty() || !cryptoStream_.lossBuffer.empty(); } diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index fb8c33a18..c131444d8 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -560,7 +560,7 @@ QuicTransportBase::getStreamWriteBufferedBytes(StreamId id) const { } try { auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); - return stream->writeBuffer.chainLength(); + return stream->pendingWrites.chainLength(); } catch (const QuicInternalException& ex) { VLOG(4) << __func__ << " " << ex.what() << " " << *this; return folly::makeUnexpected(ex.errorCode()); @@ -1672,6 +1672,10 @@ void QuicTransportBase::handleDeliveryCallbacks() { auto maxOffsetToDeliver = getLargestDeliverableOffset(*stream); while (maxOffsetToDeliver.has_value()) { + size_t amountTrimmed = stream->writeBuffer.trimStartAtMost( + *maxOffsetToDeliver - stream->writeBufferStartOffset); + stream->writeBufferStartOffset += amountTrimmed; + auto deliveryCallbacksForAckedStream = deliveryCallbacks_.find(streamId); if (deliveryCallbacksForAckedStream == deliveryCallbacks_.end() || deliveryCallbacksForAckedStream->second.empty()) { diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index f113e1711..203302ecb 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -31,13 +31,13 @@ namespace { */ bool cryptoHasWritableData(const quic::QuicConnectionStateBase& conn) { return (conn.initialWriteCipher && - (!conn.cryptoState->initialStream.writeBuffer.empty() || + (!conn.cryptoState->initialStream.pendingWrites.empty() || !conn.cryptoState->initialStream.lossBuffer.empty())) || (conn.handshakeWriteCipher && - (!conn.cryptoState->handshakeStream.writeBuffer.empty() || + (!conn.cryptoState->handshakeStream.pendingWrites.empty() || !conn.cryptoState->handshakeStream.lossBuffer.empty())) || (conn.oneRttWriteCipher && - (!conn.cryptoState->oneRttStream.writeBuffer.empty() || + (!conn.cryptoState->oneRttStream.pendingWrites.empty() || !conn.cryptoState->oneRttStream.lossBuffer.empty())); } @@ -415,8 +415,9 @@ void handleNewStreamDataWritten( // Idealy we should also check this data doesn't exist in either retx buffer // or loss buffer, but that's an expensive search. stream.currentWriteOffset += frameLen; - auto bufWritten = stream.writeBuffer.splitAtMost(folly::to(frameLen)); - DCHECK_EQ(bufWritten->computeChainDataLength(), frameLen); + ChainedByteRangeHead bufWritten( + stream.pendingWrites.splitAtMost(folly::to(frameLen))); + DCHECK_EQ(bufWritten.chainLength(), frameLen); // TODO: If we want to be able to write FIN out of order for DSR-ed streams, // this needs to be fixed: stream.currentWriteOffset += frameFin ? 1 : 0; @@ -424,7 +425,7 @@ void handleNewStreamDataWritten( .emplace( std::piecewise_construct, std::forward_as_tuple(originalOffset), - std::forward_as_tuple(std::make_unique( + std::forward_as_tuple(std::make_unique( std::move(bufWritten), originalOffset, frameFin))) .second); } @@ -456,24 +457,30 @@ void handleRetransmissionWritten( uint64_t frameOffset, uint64_t frameLen, bool frameFin, - CircularDeque::iterator lossBufferIter) { + CircularDeque::iterator lossBufferIter) { auto bufferLen = lossBufferIter->data.chainLength(); - Buf bufWritten; if (frameLen == bufferLen && frameFin == lossBufferIter->eof) { // The buffer is entirely retransmitted - bufWritten = lossBufferIter->data.move(); + ChainedByteRangeHead bufWritten(std::move(lossBufferIter->data)); stream.lossBuffer.erase(lossBufferIter); + CHECK(stream.retransmissionBuffer + .emplace( + std::piecewise_construct, + std::forward_as_tuple(frameOffset), + std::forward_as_tuple(std::make_unique( + std::move(bufWritten), frameOffset, frameFin))) + .second); } else { lossBufferIter->offset += frameLen; - bufWritten = lossBufferIter->data.splitAtMost(frameLen); + ChainedByteRangeHead bufWritten(lossBufferIter->data.splitAtMost(frameLen)); + CHECK(stream.retransmissionBuffer + .emplace( + std::piecewise_construct, + std::forward_as_tuple(frameOffset), + std::forward_as_tuple(std::make_unique( + std::move(bufWritten), frameOffset, frameFin))) + .second); } - CHECK(stream.retransmissionBuffer - .emplace( - std::piecewise_construct, - std::forward_as_tuple(frameOffset), - std::forward_as_tuple(std::make_unique( - std::move(bufWritten), frameOffset, frameFin))) - .second); } void handleRetransmissionBufMetaWritten( @@ -1899,7 +1906,7 @@ void implicitAckCryptoStream( cryptoStream->lossBuffer.clear(); CHECK(cryptoStream->retransmissionBuffer.empty()); // The write buffer should be empty, there's no optional crypto data. - CHECK(cryptoStream->writeBuffer.empty()); + CHECK(cryptoStream->pendingWrites.empty()); } void handshakeConfirmed(QuicConnectionStateBase& conn) { diff --git a/quic/api/QuicTransportFunctions.h b/quic/api/QuicTransportFunctions.h index 921871759..41634fb99 100644 --- a/quic/api/QuicTransportFunctions.h +++ b/quic/api/QuicTransportFunctions.h @@ -172,7 +172,7 @@ void handleRetransmissionWritten( uint64_t frameOffset, uint64_t frameLen, bool frameFin, - CircularDeque::iterator lossBufferIter); + CircularDeque::iterator lossBufferIter); /** * Update the connection and stream state after stream data is written and deal diff --git a/quic/api/test/BUCK b/quic/api/test/BUCK index d161121a3..986befe90 100644 --- a/quic/api/test/BUCK +++ b/quic/api/test/BUCK @@ -97,6 +97,7 @@ cpp_unittest( ], deps = [ ":mocks", + "//folly:range", "//quic/api:transport", "//quic/common/events:folly_eventbase", "//quic/common/test:test_utils", diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 832496ae1..6d4d8dc75 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -321,8 +321,10 @@ TEST_F(QuicPacketSchedulerTest, CryptoPaddingRetransmissionClientInitial) { "CryptoOnlyScheduler") .cryptoFrames()) .build(); + Buf helloBuf = folly::IOBuf::copyBuffer("chlo"); + ChainedByteRangeHead clientHelloData(helloBuf); conn.cryptoState->initialStream.lossBuffer.push_back( - StreamBuffer{folly::IOBuf::copyBuffer("chlo"), 0, false}); + WriteStreamBuffer{std::move(clientHelloData), 0, false}); auto result = scheduler.scheduleFramesForPacket( std::move(builder), conn.udpSendPacketLen); auto packetLength = result.packet->header.computeChainDataLength() + @@ -348,13 +350,15 @@ TEST_F(QuicPacketSchedulerTest, CryptoSchedulerOnlySingleLossFits) { PacketBuilderWrapper builderWrapper(builder, 13); CryptoStreamScheduler scheduler( conn, *getCryptoStream(*conn.cryptoState, EncryptionLevel::Handshake)); - conn.cryptoState->handshakeStream.lossBuffer.push_back( - StreamBuffer{folly::IOBuf::copyBuffer("shlo"), 0, false}); - conn.cryptoState->handshakeStream.lossBuffer.push_back(StreamBuffer{ - folly::IOBuf::copyBuffer( - "certificatethatisverylongseriouslythisisextremelylongandcannotfitintoapacket"), - 7, - false}); + + Buf helloBuf = folly::IOBuf::copyBuffer("shlo"); + Buf certBuf = folly::IOBuf::copyBuffer( + "certificatethatisverylongseriouslythisisextremelylongandcannotfitintoapacket"); + + conn.cryptoState->handshakeStream.lossBuffer.emplace_back( + ChainedByteRangeHead(helloBuf), 0, false); + conn.cryptoState->handshakeStream.lossBuffer.emplace_back( + ChainedByteRangeHead(certBuf), 7, false); EXPECT_TRUE(scheduler.writeCryptoData(builderWrapper)); } @@ -382,10 +386,10 @@ TEST_F(QuicPacketSchedulerTest, CryptoWritePartialLossBuffer) { "CryptoOnlyScheduler") .cryptoFrames()) .build(); - conn.cryptoState->initialStream.lossBuffer.push_back(StreamBuffer{ - folly::IOBuf::copyBuffer("return the special duration value max"), - 0, - false}); + Buf lossBuffer = + folly::IOBuf::copyBuffer("return the special duration value max"); + conn.cryptoState->initialStream.lossBuffer.emplace_back( + ChainedByteRangeHead(lossBuffer), 0, false); auto result = cryptoOnlyScheduler.scheduleFramesForPacket( std::move(builder), conn.udpSendPacketLen); auto packetLength = result.packet->header.computeChainDataLength() + @@ -662,11 +666,12 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasHandshakeDataAndAcks) { // Add some crypto data for the outstanding packet to make it look legit. // This is so cloning scheduler can actually copy something. + Buf cryptoBuf = folly::IOBuf::copyBuffer("test"); + ChainedByteRangeHead cryptoRch(cryptoBuf); getCryptoStream(*conn.cryptoState, EncryptionLevel::Handshake) ->retransmissionBuffer.emplace( 0, - std::make_unique( - folly::IOBuf::copyBuffer("test"), 0, false)); + std::make_unique(std::move(cryptoRch), 0, false)); conn.outstandings.packets.back().packet.frames.push_back( WriteCryptoFrame(0, 4)); @@ -1493,6 +1498,8 @@ TEST_F( dsrStream->ackedIntervals.insert(0, dsrStream->writeBuffer.chainLength() - 1); dsrStream->currentWriteOffset = dsrStream->writeBuffer.chainLength(); dsrStream->writeBuffer.move(); + ChainedByteRangeHead(std::move( + dsrStream->pendingWrites)); // Move and destruct the pending writes conn.streamManager->updateWritableStreams(*dsrStream); // The default is to wraparound initially. @@ -1909,8 +1916,11 @@ TEST_F(QuicPacketSchedulerTest, HighPriNewDataBeforeLowPriLossData) { auto lowPriStream = conn.streamManager->findStream(lowPriStreamId); auto highPriStream = conn.streamManager->findStream(highPriStreamId); + Buf lossBuffer = folly::IOBuf::copyBuffer("Onegin"); + ChainedByteRangeHead lossBufferRch(lossBuffer); + lowPriStream->lossBuffer.push_back( - StreamBuffer(folly::IOBuf::copyBuffer("Onegin"), 0, false)); + WriteStreamBuffer(std::move(lossBufferRch), 0, false)); conn.streamManager->updateWritableStreams(*lowPriStream); conn.streamManager->setStreamPriority(lowPriStream->id, Priority(5, false)); @@ -1969,7 +1979,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControl) { auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); EXPECT_EQ(streamId, writeStreamFrame1.streamId); EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); - EXPECT_EQ(0, stream->writeBuffer.chainLength()); + EXPECT_EQ(0, stream->pendingWrites.chainLength()); EXPECT_EQ(1, stream->retransmissionBuffer.size()); EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); @@ -2041,7 +2051,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlIgnoreDSR) { auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); EXPECT_EQ(streamId, writeStreamFrame1.streamId); EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); - EXPECT_EQ(0, stream->writeBuffer.chainLength()); + EXPECT_EQ(0, stream->pendingWrites.chainLength()); EXPECT_EQ(1, stream->retransmissionBuffer.size()); EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); @@ -2081,7 +2091,7 @@ TEST_F(QuicPacketSchedulerTest, WriteLossWithoutFlowControlSequential) { auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); EXPECT_EQ(streamId, writeStreamFrame1.streamId); EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); - EXPECT_EQ(0, stream->writeBuffer.chainLength()); + EXPECT_EQ(0, stream->pendingWrites.chainLength()); EXPECT_EQ(1, stream->retransmissionBuffer.size()); EXPECT_EQ(1000, stream->retransmissionBuffer[0]->data.chainLength()); @@ -2133,7 +2143,7 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) { // Fake a loss data for stream2: stream2->currentWriteOffset = 201; auto lossData = buildRandomInputData(200); - stream2->lossBuffer.emplace_back(std::move(lossData), 0, true); + stream2->lossBuffer.emplace_back(ChainedByteRangeHead(lossData), 0, true); conn.streamManager->updateWritableStreams(*stream2); StreamFrameScheduler scheduler(conn); @@ -2154,7 +2164,7 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) { auto& writeStreamFrame1 = *packet1.frames[0].asWriteStreamFrame(); EXPECT_EQ(streamId1, writeStreamFrame1.streamId); EXPECT_EQ(0, getSendConnFlowControlBytesWire(conn)); - EXPECT_EQ(0, stream1->writeBuffer.chainLength()); + EXPECT_EQ(0, stream1->pendingWrites.chainLength()); EXPECT_EQ(1, stream1->retransmissionBuffer.size()); EXPECT_EQ(1000, stream1->retransmissionBuffer[0]->data.chainLength()); diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index ccb5389b1..50a3b7a5a 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -4682,7 +4682,9 @@ TEST_P( auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; auto stream = transport->getStream(streamId); - stream->writeBuffer.append(IOBuf::copyBuffer("hello")); + auto dataBuf = IOBuf::copyBuffer("hello"); + stream->pendingWrites.append(dataBuf); + stream->writeBuffer.append(std::move(dataBuf)); // Insert streamId into the list. conn->streamManager->addWritable(*stream); @@ -4744,7 +4746,9 @@ TEST_P( auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; auto stream = transport->getStream(streamId); - stream->writeBuffer.append(IOBuf::copyBuffer("hello")); + auto dataBuf = IOBuf::copyBuffer("hello"); + stream->pendingWrites.append(dataBuf); + stream->writeBuffer.append(std::move(dataBuf)); // Insert streamId into the list. conn->streamManager->addWritable(*stream); @@ -4778,7 +4782,9 @@ TEST_P( auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; auto stream = transport->getStream(streamId); - stream->writeBuffer.append(IOBuf::copyBuffer("hello")); + auto dataBuf = IOBuf::copyBuffer("hello"); + stream->pendingWrites.append(dataBuf); + stream->writeBuffer.append(std::move(dataBuf)); // Insert streamId into the list. conn->streamManager->addWritable(*stream); @@ -4816,7 +4822,9 @@ TEST_P( auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; auto stream = transport->getStream(streamId); - stream->writeBuffer.append(IOBuf::copyBuffer("hello")); + auto dataBuf = IOBuf::copyBuffer("hello"); + stream->pendingWrites.append(dataBuf); + stream->writeBuffer.append(std::move(dataBuf)); // Insert streamId into the list. conn->streamManager->addWritable(*stream); @@ -4882,7 +4890,9 @@ TEST_P( const auto& conn = transport->transportConn; auto stream = transport->getStream(streamId); std::string testString = "hello"; - stream->writeBuffer.append(IOBuf::copyBuffer(testString)); + auto dataBuf = IOBuf::copyBuffer(testString); + stream->pendingWrites.append(dataBuf); + stream->writeBuffer.append(std::move(dataBuf)); conn->flowControlState.sumCurStreamBufferLen = testString.length(); // Insert streamId into the list. @@ -4950,7 +4960,9 @@ TEST_P( auto streamId = transport->createBidirectionalStream().value(); const auto& conn = transport->transportConn; auto stream = transport->getStream(streamId); - stream->writeBuffer.append(IOBuf::copyBuffer("hello")); + auto dataBuf = IOBuf::copyBuffer("hello"); + stream->pendingWrites.append(dataBuf); + stream->writeBuffer.append(std::move(dataBuf)); // Insert streamId into the list. conn->streamManager->addWritable(*stream); diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index 20c65dbc0..d661a21ed 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -5,6 +5,7 @@ * LICENSE file in the root directory of this source tree. */ +#include #include #include @@ -278,17 +279,20 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_EQ(stream2->currentWriteOffset, 13); EXPECT_EQ(conn->flowControlState.sumCurWriteOffset, 17); - IOBufEqualTo eq; - EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); auto& rt1 = *stream1->retransmissionBuffer.at(0); EXPECT_EQ(rt1.offset, 0); - EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt1.data.front())); + std::string expected = "hey w"; + EXPECT_EQ( + folly::ByteRange((uint8_t*)expected.data(), expected.size()), + rt1.data.getHead()->getRange()); EXPECT_EQ(stream2->retransmissionBuffer.size(), 1); auto& rt2 = *stream2->retransmissionBuffer.at(0); EXPECT_EQ(rt2.offset, 0); - EXPECT_TRUE(eq(*buf, *rt2.data.front())); + EXPECT_EQ( + folly::ByteRange(buf->buffer(), buf->length()), + rt2.data.getHead()->getRange()); EXPECT_TRUE(rt2.eof); // Testing retransmission @@ -342,22 +346,34 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { EXPECT_EQ(stream1->lossBuffer.size(), 0); EXPECT_EQ(stream1->retransmissionBuffer.size(), 2); auto& rt3 = *stream1->retransmissionBuffer.at(5); - EXPECT_TRUE(eq(IOBuf::copyBuffer("hats up"), rt3.data.move())); + expected = "hats up"; + EXPECT_EQ( + folly::ByteRange((uint8_t*)expected.data(), expected.size()), + rt3.data.getHead()->getRange()); auto& rt4 = *stream1->retransmissionBuffer.at(0); - EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt4.data.front())); + expected = "hey w"; + EXPECT_EQ( + folly::ByteRange((uint8_t*)expected.data(), expected.size()), + rt4.data.getHead()->getRange()); // loss buffer should be split into 2. Part in retransmission buffer and // part remains in loss buffer. EXPECT_EQ(stream2->lossBuffer.size(), 1); EXPECT_EQ(stream2->retransmissionBuffer.size(), 1); auto& rt5 = *stream2->retransmissionBuffer.at(0); - EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey wh"), *rt5.data.front())); + expected = "hey wh"; + EXPECT_EQ( + folly::ByteRange((uint8_t*)expected.data(), expected.size()), + rt5.data.getHead()->getRange()); EXPECT_EQ(rt5.offset, 0); EXPECT_EQ(rt5.eof, 0); auto& rt6 = stream2->lossBuffer.front(); - EXPECT_TRUE(eq(*IOBuf::copyBuffer("ats up"), *rt6.data.front())); + expected = "ats up"; + EXPECT_EQ( + folly::ByteRange((uint8_t*)expected.data(), expected.size()), + rt6.data.getHead()->getRange()); EXPECT_EQ(rt6.offset, 6); EXPECT_EQ(rt6.eof, 1); @@ -394,8 +410,6 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnection) { } TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionPacketRetrans) { - const IOBufEqualTo eq; - auto conn = createConn(); auto mockCongestionController = std::make_unique>(); @@ -466,7 +480,9 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionPacketRetrans) { ASSERT_EQ(stream1->retransmissionBuffer.size(), 1); auto& rt = *stream1->retransmissionBuffer.at(0); EXPECT_EQ(rt.offset, 0); - EXPECT_TRUE(eq(*buf, *rt.data.front())); + EXPECT_EQ( + folly::ByteRange(buf->data(), buf->length()), + rt.data.getHead()->getRange()); EXPECT_TRUE(rt.eof); stream1->lossBuffer.push_back(std::move(rt)); } @@ -474,7 +490,9 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionPacketRetrans) { ASSERT_EQ(stream2->retransmissionBuffer.size(), 1); auto& rt = *stream2->retransmissionBuffer.at(0); EXPECT_EQ(rt.offset, 0); - EXPECT_TRUE(eq(*buf, *rt.data.front())); + EXPECT_EQ( + folly::ByteRange(buf->data(), buf->length()), + rt.data.getHead()->getRange()); EXPECT_TRUE(rt.eof); stream2->lossBuffer.push_back(std::move(rt)); } @@ -546,8 +564,6 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionPacketRetrans) { TEST_F( QuicTransportFunctionsTest, TestUpdateConnectionPacketRetransWithNewData) { - const IOBufEqualTo eq; - auto conn = createConn(); auto mockCongestionController = std::make_unique>(); @@ -635,7 +651,9 @@ TEST_F( ASSERT_EQ(stream1->retransmissionBuffer.size(), 1); auto& rt = *stream1->retransmissionBuffer.at(0); EXPECT_EQ(rt.offset, 0); - EXPECT_TRUE(eq(*buf, *rt.data.front())); + EXPECT_EQ( + folly::ByteRange(buf->data(), buf->length()), + rt.data.getHead()->getRange()); EXPECT_TRUE(rt.eof); stream1->lossBuffer.push_back(std::move(rt)); } @@ -643,7 +661,10 @@ TEST_F( ASSERT_EQ(stream2->retransmissionBuffer.size(), 1); auto& rt = *stream2->retransmissionBuffer.at(0); EXPECT_EQ(rt.offset, 0); - EXPECT_TRUE(eq(*IOBuf::copyBuffer("hey w"), *rt.data.front())); + auto expectedBuf = IOBuf::copyBuffer("hey w"); + EXPECT_EQ( + folly::ByteRange(expectedBuf->data(), expectedBuf->length()), + rt.data.getHead()->getRange()); EXPECT_FALSE(rt.eof); stream2->lossBuffer.push_back(std::move(rt)); } @@ -651,7 +672,9 @@ TEST_F( ASSERT_EQ(stream3->retransmissionBuffer.size(), 1); auto& rt = *stream3->retransmissionBuffer.at(0); EXPECT_EQ(rt.offset, 0); - EXPECT_TRUE(eq(*buf, *rt.data.front())); + EXPECT_EQ( + folly::ByteRange(buf->data(), buf->length()), + rt.data.getHead()->getRange()); EXPECT_FALSE(rt.eof); stream3->lossBuffer.push_back(std::move(rt)); } @@ -856,7 +879,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionFinOnly) { EXPECT_EQ(stream1->currentWriteOffset, 1); EXPECT_EQ(rt1.offset, 0); - EXPECT_EQ(rt1.data.front()->computeChainDataLength(), 0); + EXPECT_EQ(rt1.data.chainLength(), 0); EXPECT_TRUE(rt1.eof); } @@ -907,9 +930,7 @@ TEST_F(QuicTransportFunctionsTest, TestUpdateConnectionAllBytesExceptFin) { EXPECT_EQ(stream1->retransmissionBuffer.size(), 1); auto& rt1 = *stream1->retransmissionBuffer.at(0); EXPECT_EQ(rt1.offset, 0); - EXPECT_EQ( - rt1.data.front()->computeChainDataLength(), - buf->computeChainDataLength()); + EXPECT_EQ(rt1.data.chainLength(), buf->computeChainDataLength()); EXPECT_FALSE(rt1.eof); } @@ -1043,11 +1064,12 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { auto initialStream = getCryptoStream(*conn->cryptoState, EncryptionLevel::Initial); - ASSERT_TRUE(initialStream->writeBuffer.empty()); + ASSERT_TRUE(initialStream->pendingWrites.empty()); ASSERT_TRUE(initialStream->retransmissionBuffer.empty()); ASSERT_TRUE(initialStream->lossBuffer.empty()); auto packet = buildEmptyPacket(*conn, PacketNumberSpace::Initial); packet.packet.frames.push_back(WriteCryptoFrame(0, data->length())); + initialStream->pendingWrites.append(data); initialStream->writeBuffer.append(data->clone()); updateConnection( *conn, @@ -1067,7 +1089,9 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { WriteCryptoFrame(data->length(), data->length())); packet.packet.frames.push_back( WriteCryptoFrame(data->length() * 2, data->length())); + initialStream->pendingWrites.append(data); initialStream->writeBuffer.append(data->clone()); + initialStream->pendingWrites.append(data); initialStream->writeBuffer.append(data->clone()); updateConnection( *conn, @@ -1081,12 +1105,12 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { EXPECT_EQ(0, conn->outstandings.packetCount[PacketNumberSpace::Handshake]); EXPECT_EQ(2, conn->outstandings.packets.size()); EXPECT_EQ(3, initialStream->retransmissionBuffer.size()); - EXPECT_TRUE(initialStream->writeBuffer.empty()); + EXPECT_TRUE(initialStream->pendingWrites.empty()); EXPECT_TRUE(initialStream->lossBuffer.empty()); // Fake loss. - Buf firstBuf = - initialStream->retransmissionBuffer.find(0)->second->data.move(); + ChainedByteRangeHead firstBuf( + std::move(initialStream->retransmissionBuffer.find(0)->second->data)); initialStream->retransmissionBuffer.erase(0); initialStream->lossBuffer.emplace_back(std::move(firstBuf), 0, false); conn->outstandings.packets.pop_front(); @@ -1094,11 +1118,12 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { auto handshakeStream = getCryptoStream(*conn->cryptoState, EncryptionLevel::Handshake); - ASSERT_TRUE(handshakeStream->writeBuffer.empty()); + ASSERT_TRUE(handshakeStream->pendingWrites.empty()); ASSERT_TRUE(handshakeStream->retransmissionBuffer.empty()); ASSERT_TRUE(handshakeStream->lossBuffer.empty()); packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake); packet.packet.frames.push_back(WriteCryptoFrame(0, data->length())); + handshakeStream->pendingWrites.append(data); handshakeStream->writeBuffer.append(data->clone()); updateConnection( *conn, @@ -1116,6 +1141,7 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { packet = buildEmptyPacket(*conn, PacketNumberSpace::Handshake); packet.packet.frames.push_back( WriteCryptoFrame(data->length(), data->length())); + handshakeStream->pendingWrites.append(data); handshakeStream->writeBuffer.append(data->clone()); updateConnection( *conn, @@ -1129,11 +1155,12 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { EXPECT_EQ(2, conn->outstandings.packetCount[PacketNumberSpace::Handshake]); EXPECT_EQ(3, conn->outstandings.packets.size()); EXPECT_EQ(2, handshakeStream->retransmissionBuffer.size()); - EXPECT_TRUE(handshakeStream->writeBuffer.empty()); + EXPECT_TRUE(handshakeStream->pendingWrites.empty()); EXPECT_TRUE(handshakeStream->lossBuffer.empty()); // Fake loss. - firstBuf = handshakeStream->retransmissionBuffer.find(0)->second->data.move(); + firstBuf = ChainedByteRangeHead( + std::move(handshakeStream->retransmissionBuffer.find(0)->second->data)); handshakeStream->retransmissionBuffer.erase(0); handshakeStream->lossBuffer.emplace_back(std::move(firstBuf), 0, false); auto& op = conn->outstandings.packets.front(); @@ -1149,7 +1176,7 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { EXPECT_EQ(1, conn->outstandings.packetCount[PacketNumberSpace::Handshake]); EXPECT_EQ(1, conn->outstandings.packets.size()); EXPECT_TRUE(initialStream->retransmissionBuffer.empty()); - EXPECT_TRUE(initialStream->writeBuffer.empty()); + EXPECT_TRUE(initialStream->pendingWrites.empty()); EXPECT_TRUE(initialStream->lossBuffer.empty()); implicitAckCryptoStream(*conn, EncryptionLevel::Handshake); @@ -1157,7 +1184,7 @@ TEST_F(QuicTransportFunctionsTest, TestImplicitAck) { EXPECT_EQ(0, conn->outstandings.packetCount[PacketNumberSpace::Handshake]); EXPECT_TRUE(conn->outstandings.packets.empty()); EXPECT_TRUE(handshakeStream->retransmissionBuffer.empty()); - EXPECT_TRUE(handshakeStream->writeBuffer.empty()); + EXPECT_TRUE(handshakeStream->pendingWrites.empty()); EXPECT_TRUE(handshakeStream->lossBuffer.empty()); } @@ -3310,7 +3337,7 @@ TEST_F(QuicTransportFunctionsTest, NoCryptoProbeWriteIfNoProbeCredit) { EXPECT_EQ(conn->udpSendPacketLen, res.bytesWritten); ASSERT_EQ(1, conn->outstandings.packets.size()); ASSERT_EQ(1, cryptoStream->retransmissionBuffer.size()); - ASSERT_TRUE(cryptoStream->writeBuffer.empty()); + ASSERT_TRUE(cryptoStream->pendingWrites.empty()); conn->pendingEvents.numProbePackets[PacketNumberSpace::Initial] = 0; conn->pendingEvents.numProbePackets[PacketNumberSpace::Handshake] = 0; @@ -3663,8 +3690,9 @@ TEST_F(QuicTransportFunctionsTest, HasAckDataToWriteMismatch) { TEST_F(QuicTransportFunctionsTest, HasCryptoDataToWrite) { auto conn = createConn(); + auto dataBuf = folly::IOBuf::copyBuffer("Grab your coat and get your hat"); conn->cryptoState->initialStream.lossBuffer.emplace_back( - folly::IOBuf::copyBuffer("Grab your coat and get your hat"), 0, false); + ChainedByteRangeHead(dataBuf), 0, false); EXPECT_EQ(WriteDataReason::CRYPTO_STREAM, hasNonAckDataToWrite(*conn)); conn->initialWriteCipher.reset(); EXPECT_EQ(WriteDataReason::NO_WRITE, hasNonAckDataToWrite(*conn)); @@ -4155,19 +4183,21 @@ TEST_F(QuicTransportFunctionsTest, HandshakeConfirmedDropCipher) { LongHeader::Types::Handshake); ASSERT_FALSE(initialStream->retransmissionBuffer.empty()); ASSERT_FALSE(handshakeStream->retransmissionBuffer.empty()); - initialStream->insertIntoLossBuffer(std::make_unique( - folly::IOBuf::copyBuffer( - "I don't see the dialup info in the meeting invite"), - 0, - false)); - handshakeStream->insertIntoLossBuffer(std::make_unique( - folly::IOBuf::copyBuffer("Traffic Protocol Weekly Sync"), 0, false)); + auto lossBufferData1 = folly::IOBuf::copyBuffer( + "I don't see the dialup info in the meeting invite"); + initialStream->insertIntoLossBuffer(std::make_unique( + ChainedByteRangeHead(lossBufferData1), 0, false)); + + auto lossBufferData2 = + folly::IOBuf::copyBuffer("Traffic Protocol Weekly Sync"); + handshakeStream->insertIntoLossBuffer(std::make_unique( + ChainedByteRangeHead(lossBufferData2), 0, false)); handshakeConfirmed(*conn); - EXPECT_TRUE(initialStream->writeBuffer.empty()); + EXPECT_TRUE(initialStream->pendingWrites.empty()); EXPECT_TRUE(initialStream->retransmissionBuffer.empty()); EXPECT_TRUE(initialStream->lossBuffer.empty()); - EXPECT_TRUE(handshakeStream->writeBuffer.empty()); + EXPECT_TRUE(handshakeStream->pendingWrites.empty()); EXPECT_TRUE(handshakeStream->retransmissionBuffer.empty()); EXPECT_TRUE(handshakeStream->lossBuffer.empty()); EXPECT_EQ(nullptr, conn->initialWriteCipher); diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 9a4b3b530..d781e43a9 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -206,7 +206,7 @@ void verifyCorrectness( auto stream = conn.streamManager->findStream(id); ASSERT_TRUE(stream); if (writeAll) { - EXPECT_TRUE(stream->writeBuffer.empty()); + EXPECT_TRUE(stream->pendingWrites.empty()); } EXPECT_EQ(stream->currentWriteOffset, endOffset + (finSet ? 1 : 0)); EXPECT_EQ( @@ -220,7 +220,7 @@ void verifyCorrectness( std::vector rtxCopy; for (auto& itr : stream->retransmissionBuffer) { rtxCopy.push_back(StreamBuffer( - itr.second->data.front()->clone(), + folly::IOBuf::copyBuffer(itr.second->data.getHead()->getRange()), itr.second->offset, itr.second->eof)); } @@ -297,7 +297,8 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) { curBuf->append(curBuf->capacity()); curBuf = curBuf->next(); } while (curBuf != largeBuf.get()); - lossStreamState->lossBuffer.emplace_back(std::move(largeBuf), 31, false); + ChainedByteRangeHead largeBufRch(largeBuf); + lossStreamState->lossBuffer.emplace_back(std::move(largeBufRch), 31, false); conn.streamManager->updateWritableStreams(*lossStreamState); transport_->writeChain( stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr); @@ -2097,9 +2098,9 @@ TEST_F(QuicTransportTest, RstStream) { ASSERT_TRUE(stream); EXPECT_EQ(stream->sendState, StreamSendState::ResetSent); EXPECT_TRUE(stream->retransmissionBuffer.empty()); - EXPECT_TRUE(stream->writeBuffer.empty()); + EXPECT_TRUE(stream->pendingWrites.empty()); EXPECT_FALSE(stream->writable()); - EXPECT_TRUE(stream->writeBuffer.empty()); + EXPECT_TRUE(stream->pendingWrites.empty()); EXPECT_FALSE(writableContains( *transport_->getConnectionState().streamManager, stream->id)); } @@ -2849,7 +2850,7 @@ TEST_F(QuicTransportTest, RstWrittenStream) { EXPECT_EQ(stream->sendState, StreamSendState::ResetSent); EXPECT_TRUE(stream->retransmissionBuffer.empty()); - EXPECT_TRUE(stream->writeBuffer.empty()); + EXPECT_TRUE(stream->pendingWrites.empty()); EXPECT_FALSE(stream->writable()); EXPECT_FALSE(writableContains( *transport_->getConnectionState().streamManager, stream->id)); @@ -2888,7 +2889,7 @@ TEST_F(QuicTransportTest, RstStreamUDPWriteFailNonFatal) { // this steam unwriable and drop current writeBuffer and // retransmissionBuffer: EXPECT_TRUE(stream->retransmissionBuffer.empty()); - EXPECT_TRUE(stream->writeBuffer.empty()); + EXPECT_TRUE(stream->pendingWrites.empty()); EXPECT_FALSE(stream->writable()); } @@ -2919,7 +2920,7 @@ TEST_F(QuicTransportTest, WriteAfterSendRst) { EXPECT_EQ(stream->sendState, StreamSendState::ResetSent); EXPECT_TRUE(stream->retransmissionBuffer.empty()); - EXPECT_TRUE(stream->writeBuffer.empty()); + EXPECT_TRUE(stream->pendingWrites.empty()); EXPECT_FALSE(stream->writable()); EXPECT_FALSE(writableContains( *transport_->getConnectionState().streamManager, stream->id)); @@ -3221,11 +3222,12 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) { conn.lossState.srtt = 100us; auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); + auto retxBufferData = folly::IOBuf::copyBuffer("But i'm not delivered yet"); streamState->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(51), - std::forward_as_tuple(std::make_unique( - folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(retxBufferData), 51, false))); folly::SocketAddress addr; NetworkData emptyData; @@ -3269,13 +3271,15 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) { auto streamState = conn.streamManager->getStream(stream); streamState->retransmissionBuffer.clear(); streamState->lossBuffer.clear(); + auto retxBufferData = folly::IOBuf::copyBuffer("But i'm not delivered yet"); streamState->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(51), - std::forward_as_tuple(std::make_unique( - folly::IOBuf::copyBuffer("But i'm not delivered yet"), 51, false))); - streamState->lossBuffer.emplace_back( - folly::IOBuf::copyBuffer("And I'm lost"), 31, false); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(retxBufferData), 51, false))); + auto lossBufferData = folly::IOBuf::copyBuffer("And I'm lost"); + ChainedByteRangeHead lossBufferRch(lossBufferData); + streamState->lossBuffer.emplace_back(std::move(lossBufferRch), 31, false); streamState->ackedIntervals.insert(0, 30); folly::SocketAddress addr; diff --git a/quic/codec/QuicPacketRebuilder.cpp b/quic/codec/QuicPacketRebuilder.cpp index b9841ccda..826472c85 100644 --- a/quic/codec/QuicPacketRebuilder.cpp +++ b/quic/codec/QuicPacketRebuilder.cpp @@ -279,7 +279,7 @@ Optional PacketRebuilder::rebuildFromPacket( return cloneOutstandingPacket(packet); } -const BufQueue* PacketRebuilder::cloneCryptoRetransmissionBuffer( +const ChainedByteRangeHead* PacketRebuilder::cloneCryptoRetransmissionBuffer( const WriteCryptoFrame& frame, const QuicCryptoStream& stream) { /** @@ -303,7 +303,7 @@ const BufQueue* PacketRebuilder::cloneCryptoRetransmissionBuffer( return &(iter->second->data); } -const BufQueue* PacketRebuilder::cloneRetransmissionBuffer( +const ChainedByteRangeHead* PacketRebuilder::cloneRetransmissionBuffer( const WriteStreamFrame& frame, const QuicStreamState* stream) { /** diff --git a/quic/codec/QuicPacketRebuilder.h b/quic/codec/QuicPacketRebuilder.h index 56619b102..603c014e7 100644 --- a/quic/codec/QuicPacketRebuilder.h +++ b/quic/codec/QuicPacketRebuilder.h @@ -46,11 +46,11 @@ class PacketRebuilder { return stream.sendState == StreamSendState::Open; } - const BufQueue* cloneCryptoRetransmissionBuffer( + const ChainedByteRangeHead* cloneCryptoRetransmissionBuffer( const WriteCryptoFrame& frame, const QuicCryptoStream& stream); - const BufQueue* cloneRetransmissionBuffer( + const ChainedByteRangeHead* cloneRetransmissionBuffer( const WriteStreamFrame& frame, const QuicStreamState* stream); diff --git a/quic/codec/QuicWriteCodec.cpp b/quic/codec/QuicWriteCodec.cpp index ebf90430a..66a7c8e80 100644 --- a/quic/codec/QuicWriteCodec.cpp +++ b/quic/codec/QuicWriteCodec.cpp @@ -167,6 +167,15 @@ Optional writeStreamFrameHeader( return folly::make_optional(dataLen); } +void writeStreamFrameData( + PacketBuilderInterface& builder, + const ChainedByteRangeHead& writeBuffer, + uint64_t dataLen) { + if (dataLen > 0) { + builder.insert(writeBuffer, dataLen); + } +} + void writeStreamFrameData( PacketBuilderInterface& builder, const BufQueue& writeBuffer, @@ -187,7 +196,7 @@ void writeStreamFrameData( Optional writeCryptoFrame( uint64_t offsetIn, - const BufQueue& data, + const ChainedByteRangeHead& data, PacketBuilderInterface& builder) { uint64_t spaceLeftInPkt = builder.remainingSpaceInPkt(); QuicInteger intFrameType(static_cast(FrameType::CRYPTO_FRAME)); diff --git a/quic/codec/QuicWriteCodec.h b/quic/codec/QuicWriteCodec.h index 7a2c27aaa..8daf3c8b1 100644 --- a/quic/codec/QuicWriteCodec.h +++ b/quic/codec/QuicWriteCodec.h @@ -60,6 +60,11 @@ Optional writeStreamFrameHeader( OptionalIntegral streamGroupId = std::nullopt, bool appendFrame = true); +void writeStreamFrameData( + PacketBuilderInterface& builder, + const ChainedByteRangeHead& writeBuffer, + uint64_t dataLen); + /** * Write stream frama data into builder * This writes dataLen worth of bytes from the parameter writeBuffer into the @@ -95,7 +100,7 @@ void writeStreamFrameData( */ Optional writeCryptoFrame( uint64_t offsetIn, - const BufQueue& data, + const ChainedByteRangeHead& data, PacketBuilderInterface& builder); /** diff --git a/quic/codec/test/QuicPacketBuilderTest.cpp b/quic/codec/test/QuicPacketBuilderTest.cpp index d21538cf8..95dd79950 100644 --- a/quic/codec/test/QuicPacketBuilderTest.cpp +++ b/quic/codec/test/QuicPacketBuilderTest.cpp @@ -199,12 +199,14 @@ TEST_P(QuicPacketBuilderTest, LongHeaderRegularPacket) { return rawBuilder; }; + auto chloBuf = folly::IOBuf::copyBuffer("CHLO"); + ChainedByteRangeHead chloRch(chloBuf); auto resultRegularPacket = createInitialCryptoPacket( serverConnId, clientConnId, pktNum, ver, - *folly::IOBuf::copyBuffer("CHLO"), + chloRch, *cleartextAead, 0 /* largestAcked */, 0 /* offset */, diff --git a/quic/codec/test/QuicPacketRebuilderTest.cpp b/quic/codec/test/QuicPacketRebuilderTest.cpp index 1dc28464e..c93dbefa8 100644 --- a/quic/codec/test/QuicPacketRebuilderTest.cpp +++ b/quic/codec/test/QuicPacketRebuilderTest.cpp @@ -148,19 +148,20 @@ TEST_F(QuicPacketRebuilderTest, RebuildPacket) { regularBuilder1, buf->clone(), buf->computeChainDataLength()); writeFrame(maxDataFrame, regularBuilder1); writeFrame(maxStreamDataFrame, regularBuilder1); - writeCryptoFrame(cryptoOffset, cryptoBuf->clone(), regularBuilder1); + writeCryptoFrame( + cryptoOffset, ChainedByteRangeHead(cryptoBuf), regularBuilder1); auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(8, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple( - std::make_unique(buf->clone(), 0, true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf), 0, true))); conn.cryptoState->oneRttStream.retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple( - std::make_unique(cryptoBuf->clone(), 0, true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(cryptoBuf), 0, true))); // Write an updated ackState that should be used when rebuilding the AckFrame conn.ackStates.appDataAckState.acks.insert(1000, 1200); conn.ackStates.appDataAckState.largestRecvdPacketTime.assign( @@ -310,7 +311,8 @@ TEST_F(QuicPacketRebuilderTest, FinOnlyStreamRebuild) { stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique(nullptr, 0, true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(), 0, true))); // rebuild a packet from the built out packet ShortHeader shortHeader2( @@ -363,14 +365,15 @@ TEST_F(QuicPacketRebuilderTest, RebuildDataStreamAndEmptyCryptoStream) { none /* skipLenHint */); writeStreamFrameData( regularBuilder1, buf->clone(), buf->computeChainDataLength()); - writeCryptoFrame(cryptoOffset, cryptoBuf->clone(), regularBuilder1); + writeCryptoFrame( + cryptoOffset, ChainedByteRangeHead(cryptoBuf), regularBuilder1); auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(2, packet1.packet.frames.size()); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple( - std::make_unique(buf->clone(), 0, true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf), 0, true))); // Do not add the buf to crypto stream's retransmission buffer, // imagine it was cleared @@ -412,7 +415,8 @@ TEST_F(QuicPacketRebuilderTest, CannotRebuildEmptyCryptoStream) { auto cryptoBuf = folly::IOBuf::copyBuffer("NewSessionTicket"); // Write them with a regular builder - writeCryptoFrame(cryptoOffset, cryptoBuf->clone(), regularBuilder1); + writeCryptoFrame( + cryptoOffset, ChainedByteRangeHead(cryptoBuf), regularBuilder1); auto packet1 = std::move(regularBuilder1).buildPacket(); ASSERT_EQ(1, packet1.packet.frames.size()); // Do not add the buf to crypto stream's retransmission buffer, @@ -477,8 +481,8 @@ TEST_F(QuicPacketRebuilderTest, CannotRebuild) { stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple( - std::make_unique(buf->clone(), 0, true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf), 0, true))); // new builder has a much smaller writable bytes limit ShortHeader shortHeader2( @@ -582,13 +586,13 @@ TEST_F(QuicPacketRebuilderTest, LastStreamFrameSkipLen) { stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple( - std::make_unique(buf1->clone(), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf1), 0, false))); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(buf1->computeChainDataLength()), - std::forward_as_tuple(std::make_unique( - buf2->clone(), buf1->computeChainDataLength(), true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf2), buf1->computeChainDataLength(), true))); MockQuicPacketBuilder mockBuilder; size_t packetLimit = 1200; @@ -596,10 +600,11 @@ TEST_F(QuicPacketRebuilderTest, LastStreamFrameSkipLen) { return packetLimit; })); // write data twice - EXPECT_CALL(mockBuilder, insert(_, _)) + EXPECT_CALL(mockBuilder, _insertRch(_, _)) .Times(2) - .WillRepeatedly( - Invoke([&](const BufQueue&, size_t limit) { packetLimit -= limit; })); + .WillRepeatedly(Invoke([&](const ChainedByteRangeHead&, size_t limit) { + packetLimit -= limit; + })); // Append frame twice EXPECT_CALL(mockBuilder, appendFrame(_)).Times(2); // initial byte: @@ -649,19 +654,19 @@ TEST_F(QuicPacketRebuilderTest, LastStreamFrameFinOnlySkipLen) { 0, true, none); - writeStreamFrameData(regularBuilder, nullptr, 0); + writeStreamFrameData(regularBuilder, ChainedByteRangeHead(), 0); auto packet = std::move(regularBuilder).buildPacket(); auto outstandingPacket = makeDummyOutstandingPacket(packet.packet, 1200); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple( - std::make_unique(buf1->clone(), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf1), 0, false))); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(buf1->computeChainDataLength()), - std::forward_as_tuple(std::make_unique( - nullptr, buf1->computeChainDataLength(), true))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(), buf1->computeChainDataLength(), true))); MockQuicPacketBuilder mockBuilder; size_t packetLimit = 1200; @@ -669,10 +674,11 @@ TEST_F(QuicPacketRebuilderTest, LastStreamFrameFinOnlySkipLen) { return packetLimit; })); // write data only - EXPECT_CALL(mockBuilder, insert(_, _)) + EXPECT_CALL(mockBuilder, _insertRch(_, _)) .Times(1) - .WillOnce( - Invoke([&](const BufQueue&, size_t limit) { packetLimit -= limit; })); + .WillOnce(Invoke([&](const ChainedByteRangeHead&, size_t limit) { + packetLimit -= limit; + })); // Append frame twice EXPECT_CALL(mockBuilder, appendFrame(_)).Times(2); // initial byte: diff --git a/quic/codec/test/QuicReadCodecTest.cpp b/quic/codec/test/QuicReadCodecTest.cpp index 1b47b9113..93bc9f510 100644 --- a/quic/codec/test/QuicReadCodecTest.cpp +++ b/quic/codec/test/QuicReadCodecTest.cpp @@ -198,7 +198,8 @@ TEST_F(QuicReadCodecTest, LongHeaderPacketLenMismatch) { kDefaultUDPSendPacketLen, std::move(headerIn), 0 /* largestAcked */); builder.encodePacketHeader(); builder.accountForCipherOverhead(0); - writeCryptoFrame(0, folly::IOBuf::copyBuffer("CHLO"), builder); + auto cryptoFrameBuf = folly::IOBuf::copyBuffer("CHLO"); + writeCryptoFrame(0, ChainedByteRangeHead(cryptoFrameBuf), builder); auto packet = packetToBuf(std::move(builder).buildPacket()); auto packetQueue = bufToQueue(std::move(packet)); @@ -654,12 +655,14 @@ TEST_F(QuicReadCodecTest, TestInitialPacket) { auto aead = cryptoFactory.getClientInitialCipher(connId, QuicVersion::MVFST); auto headerCipher = cryptoFactory.makeClientInitialHeaderCipher(connId, QuicVersion::MVFST); + auto initialCryptoPacketBuf = folly::IOBuf::copyBuffer("CHLO"); + ChainedByteRangeHead initialCryptoPacketRch(initialCryptoPacketBuf); auto packet = createInitialCryptoPacket( getTestConnectionId(), connId, packetNum, QuicVersion::MVFST, - *folly::IOBuf::copyBuffer("CHLO"), + initialCryptoPacketRch, *aead, offset); @@ -689,12 +692,14 @@ TEST_F(QuicReadCodecTest, TestInitialPacketExtractToken) { auto headerCipher = cryptoFactory.makeClientInitialHeaderCipher(connId, QuicVersion::MVFST); std::string token = "aswerdfewdewrgetg"; + auto initialCryptoPacketBuf = folly::IOBuf::copyBuffer("CHLO"); + ChainedByteRangeHead initialCryptoPacketRch(initialCryptoPacketBuf); auto packet = createInitialCryptoPacket( getTestConnectionId(), connId, packetNum, QuicVersion::MVFST, - *folly::IOBuf::copyBuffer("CHLO"), + initialCryptoPacketRch, *aead, offset, 0 /* offset */, @@ -723,12 +728,14 @@ TEST_F(QuicReadCodecTest, TestHandshakeDone) { auto aead = cryptoFactory.getClientInitialCipher(connId, QuicVersion::MVFST); auto headerCipher = cryptoFactory.makeClientInitialHeaderCipher(connId, QuicVersion::MVFST); + auto initialCryptoPacketBuf = folly::IOBuf::copyBuffer("CHLO"); + ChainedByteRangeHead initialCryptoPacketRch(initialCryptoPacketBuf); auto packet = createInitialCryptoPacket( getTestConnectionId(), connId, packetNum, QuicVersion::MVFST, - *folly::IOBuf::copyBuffer("CHLO"), + initialCryptoPacketRch, *aead, offset); diff --git a/quic/common/ChainedByteRange.cpp b/quic/common/ChainedByteRange.cpp index e9464552f..df031f6ec 100644 --- a/quic/common/ChainedByteRange.cpp +++ b/quic/common/ChainedByteRange.cpp @@ -44,6 +44,21 @@ ChainedByteRangeHead::ChainedByteRangeHead(const Buf& buf) { tail_ = cur; } +ChainedByteRangeHead::ChainedByteRangeHead( + ChainedByteRangeHead&& other) noexcept { + moveChain(std::move(other)); +} + +ChainedByteRangeHead& ChainedByteRangeHead::operator=( + ChainedByteRangeHead&& other) noexcept { + moveChain(std::move(other)); + return *this; +} + +ChainedByteRangeHead::~ChainedByteRangeHead() { + resetChain(); +} + void ChainedByteRangeHead::append(const Buf& buf) { if (!buf || buf->empty()) { return; diff --git a/quic/common/ChainedByteRange.h b/quic/common/ChainedByteRange.h index 535379e74..23c7450ff 100644 --- a/quic/common/ChainedByteRange.h +++ b/quic/common/ChainedByteRange.h @@ -66,18 +66,11 @@ class ChainedByteRangeHead { ChainedByteRangeHead() = default; - ChainedByteRangeHead(ChainedByteRangeHead&& other) noexcept { - moveChain(std::move(other)); - } + ChainedByteRangeHead(ChainedByteRangeHead&& other) noexcept; - ChainedByteRangeHead& operator=(ChainedByteRangeHead&& other) noexcept { - moveChain(std::move(other)); - return *this; - } + ChainedByteRangeHead& operator=(ChainedByteRangeHead&& other) noexcept; - ~ChainedByteRangeHead() { - resetChain(); - } + ~ChainedByteRangeHead(); bool isChained() const { return head_.next_ != nullptr; diff --git a/quic/common/test/TestUtils.cpp b/quic/common/test/TestUtils.cpp index f5337c764..39c4b5649 100644 --- a/quic/common/test/TestUtils.cpp +++ b/quic/common/test/TestUtils.cpp @@ -317,7 +317,7 @@ RegularQuicPacketBuilder::Packet createInitialCryptoPacket( ConnectionId dstConnId, PacketNum packetNum, QuicVersion version, - folly::IOBuf& data, + ChainedByteRangeHead& data, const Aead& aead, PacketNum largestAcked, uint64_t offset, @@ -342,7 +342,7 @@ RegularQuicPacketBuilder::Packet createInitialCryptoPacket( } builder->encodePacketHeader(); builder->accountForCipherOverhead(aead.getCipherOverhead()); - auto res = writeCryptoFrame(offset, data.clone(), *builder); + auto res = writeCryptoFrame(offset, data, *builder); CHECK(res.hasValue()) << "failed to write crypto frame"; return std::move(*builder).buildPacket(); } @@ -353,7 +353,7 @@ RegularQuicPacketBuilder::Packet createCryptoPacket( PacketNum packetNum, QuicVersion version, ProtectionType protectionType, - folly::IOBuf& data, + ChainedByteRangeHead& data, const Aead& aead, PacketNum largestAcked, uint64_t offset, @@ -385,7 +385,7 @@ RegularQuicPacketBuilder::Packet createCryptoPacket( packetSizeLimit, std::move(*header), largestAcked); builder.encodePacketHeader(); builder.accountForCipherOverhead(aead.getCipherOverhead()); - auto res = writeCryptoFrame(offset, data.clone(), builder); + auto res = writeCryptoFrame(offset, data, builder); CHECK(res.hasValue()) << "failed to write crypto frame"; return std::move(builder).buildPacket(); } @@ -734,12 +734,6 @@ CongestionController::AckEvent::AckPacket makeAckPacketFromOutstandingPacket( .build(); } -Optional -writeCryptoFrame(uint64_t offsetIn, Buf data, PacketBuilderInterface& builder) { - BufQueue bufQueue(std::move(data)); - return writeCryptoFrame(offsetIn, bufQueue, builder); -} - void overridePacketWithToken( PacketBuilderInterface::Packet& packet, const StatelessResetToken& token) { diff --git a/quic/common/test/TestUtils.h b/quic/common/test/TestUtils.h index b34b41ad7..73557db2c 100644 --- a/quic/common/test/TestUtils.h +++ b/quic/common/test/TestUtils.h @@ -96,7 +96,7 @@ RegularQuicPacketBuilder::Packet createInitialCryptoPacket( ConnectionId dstConnId, PacketNum packetNum, QuicVersion version, - folly::IOBuf& data, + ChainedByteRangeHead& data, const Aead& aead, PacketNum largestAcked, uint64_t offset = 0, @@ -109,7 +109,7 @@ RegularQuicPacketBuilder::Packet createCryptoPacket( PacketNum packetNum, QuicVersion version, ProtectionType protectionType, - folly::IOBuf& data, + ChainedByteRangeHead& data, const Aead& aead, PacketNum largestAcked, uint64_t offset = 0, diff --git a/quic/dsr/backend/test/DSRPacketizerTest.cpp b/quic/dsr/backend/test/DSRPacketizerTest.cpp index b12a78db4..5476768b2 100644 --- a/quic/dsr/backend/test/DSRPacketizerTest.cpp +++ b/quic/dsr/backend/test/DSRPacketizerTest.cpp @@ -146,6 +146,7 @@ TEST_F(DSRMultiWriteTest, TwoRequestsWithLoss) { stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); stream->currentWriteOffset = stream->writeBuffer.chainLength(); stream->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream); auto bufMetaStartingOffset = stream->writeBufMeta.offset; // Move part of the BufMetas to lossBufMetas diff --git a/quic/dsr/frontend/Scheduler.cpp b/quic/dsr/frontend/Scheduler.cpp index 0155dec04..bf3cb1b5c 100644 --- a/quic/dsr/frontend/Scheduler.cpp +++ b/quic/dsr/frontend/Scheduler.cpp @@ -87,7 +87,7 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( stream->lossBufMetas.front() .length, // flowControlLen shouldn't be used to limit loss write stream->lossBufMetas.front().eof, - stream->currentWriteOffset + stream->writeBuffer.chainLength()); + stream->currentWriteOffset + stream->pendingWrites.chainLength()); if (encodedSize > 0) { if (builder.remainingSpace() < encodedSize) { return result; @@ -113,8 +113,8 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( if (connWritableBytes == 0) { return result; } - // When stream still has writeBuffer, getSendStreamFlowControlBytesWire counts - // from currentWriteOffset which isn't right for BufMetas. + // When stream still has pendingWrites, getSendStreamFlowControlBytesWire + // counts from currentWriteOffset which isn't right for BufMetas. auto streamFlowControlLen = std::min( getSendStreamFlowControlBytesWire(*stream), stream->flowControlState.peerAdvertisedMaxOffset - @@ -131,7 +131,7 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( stream->writeBufMeta.length, flowControlLen, canWriteFin, - stream->currentWriteOffset + stream->writeBuffer.chainLength()); + stream->currentWriteOffset + stream->pendingWrites.chainLength()); if (encodedSize > 0) { if (builder.remainingSpace() < encodedSize) { return result; diff --git a/quic/dsr/frontend/test/WriteFunctionsTest.cpp b/quic/dsr/frontend/test/WriteFunctionsTest.cpp index 314fe14b0..df5525c89 100644 --- a/quic/dsr/frontend/test/WriteFunctionsTest.cpp +++ b/quic/dsr/frontend/test/WriteFunctionsTest.cpp @@ -76,7 +76,7 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimit) { // Pretend we sent the non DSR data stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); stream->currentWriteOffset = stream->writeBuffer.chainLength(); - stream->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream); auto currentBufMetaOffset = stream->writeBufMeta.offset; size_t packetLimit = 2; @@ -108,7 +108,7 @@ TEST_F(WriteFunctionsTest, WriteLoopTimeLimitNoLimit) { // Pretend we sent the non DSR data stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); stream->currentWriteOffset = stream->writeBuffer.chainLength(); - stream->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream); auto currentBufMetaOffset = stream->writeBufMeta.offset; size_t packetLimit = 2; @@ -140,7 +140,7 @@ TEST_F(WriteFunctionsTest, WriteTwoInstructions) { // Pretend we sent the non DSR data stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); stream->currentWriteOffset = stream->writeBuffer.chainLength(); - stream->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream); auto cid = getTestConnectionId(); size_t packetLimit = 20; @@ -162,7 +162,7 @@ TEST_F(WriteFunctionsTest, PacketLimit) { // Pretend we sent the non DSR data stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); stream->currentWriteOffset = stream->writeBuffer.chainLength(); - stream->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream); auto mockCongestionController = std::make_unique>(); @@ -192,7 +192,8 @@ TEST_F(WriteFunctionsTest, WriteTwoStreams) { // Pretend we sent the non DSR data on second stream stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1); stream2->currentWriteOffset = stream2->writeBuffer.chainLength(); - stream2->writeBuffer.move(); + ChainedByteRangeHead( + std::move(stream2->pendingWrites)); // Destruct the pendingWrites conn_.streamManager->updateWritableStreams(*stream2); auto cid = getTestConnectionId(); size_t packetLimit = 20; @@ -221,7 +222,7 @@ TEST_F(WriteFunctionsTest, WriteThreeStreamsNonDsrAndDsr) { // Pretend we sent the non DSR data for last stream stream3->ackedIntervals.insert(0, stream3->writeBuffer.chainLength() - 1); stream3->currentWriteOffset = stream3->writeBuffer.chainLength(); - stream3->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream3->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream3); EXPECT_EQ(2, writePacketizationRequest(conn_, cid, packetLimit, *aead_)); EXPECT_EQ(1, stream1->retransmissionBufMetas.size()); @@ -245,7 +246,7 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsNonIncremental) { // Pretend we sent the non DSR data on first stream stream1->ackedIntervals.insert(0, stream1->writeBuffer.chainLength() - 1); stream1->currentWriteOffset = stream1->writeBuffer.chainLength(); - stream1->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream1->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream1); auto cid = getTestConnectionId(); size_t packetLimit = 2; @@ -269,7 +270,7 @@ TEST_F(WriteFunctionsTest, WriteTwoStreamsIncremental) { // Pretend we sent the non DSR data on second stream stream2->ackedIntervals.insert(0, stream2->writeBuffer.chainLength() - 1); stream2->currentWriteOffset = stream2->writeBuffer.chainLength(); - stream2->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream2->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream2); auto cid = getTestConnectionId(); size_t packetLimit = 2; @@ -289,7 +290,7 @@ TEST_F(WriteFunctionsTest, LossAndFreshTwoInstructionsInTwoPackets) { // Pretend we sent the non DSR data stream->ackedIntervals.insert(0, stream->writeBuffer.chainLength() - 1); stream->currentWriteOffset = stream->writeBuffer.chainLength(); - stream->writeBuffer.move(); + ChainedByteRangeHead(std::move(stream->pendingWrites)); conn_.streamManager->updateWritableStreams(*stream); auto bufMetaStartingOffset = stream->writeBufMeta.offset; // Move part of the BufMetas to lossBufMetas diff --git a/quic/fizz/client/test/QuicClientTransportTest.cpp b/quic/fizz/client/test/QuicClientTransportTest.cpp index 369ccc349..ce9d6fc74 100644 --- a/quic/fizz/client/test/QuicClientTransportTest.cpp +++ b/quic/fizz/client/test/QuicClientTransportTest.cpp @@ -5593,6 +5593,7 @@ TEST_F(QuicProcessDataTest, ProcessDataWithGarbageAtEnd) { *client->getConn().initialDestinationConnectionId)); mockClientHandshake->setServerTransportParams(std::move(*params)); auto serverHello = IOBuf::copyBuffer("Fake SHLO"); + ChainedByteRangeHead serverHelloRch(serverHello); PacketNum nextPacketNum = initialPacketNum++; auto& aead = getInitialCipher(); auto packet = createCryptoPacket( @@ -5601,7 +5602,7 @@ TEST_F(QuicProcessDataTest, ProcessDataWithGarbageAtEnd) { nextPacketNum, QuicVersion::QUIC_V1, ProtectionType::Initial, - *serverHello, + serverHelloRch, aead, 0 /* largestAcked */); auto packetData = packetToBufCleartext( @@ -5632,6 +5633,7 @@ TEST_F(QuicProcessDataTest, ProcessPendingData) { *client->getConn().initialDestinationConnectionId)); mockClientHandshake->setServerTransportParams(std::move(*params)); auto serverHello = IOBuf::copyBuffer("Fake SHLO"); + ChainedByteRangeHead serverHelloRch(serverHello); PacketNum nextPacketNum = initialPacketNum++; auto& aead = getInitialCipher(); auto packet = createCryptoPacket( @@ -5640,7 +5642,7 @@ TEST_F(QuicProcessDataTest, ProcessPendingData) { nextPacketNum, QuicVersion::QUIC_V1, ProtectionType::Initial, - *serverHello, + serverHelloRch, aead, 0 /* largestAcked */); auto packetData = packetToBufCleartext( @@ -5671,13 +5673,14 @@ TEST_F(QuicProcessDataTest, ProcessPendingData) { EXPECT_EQ(client->getConn().pendingOneRttData.size(), 1); auto cryptoData = folly::IOBuf::copyBuffer("Crypto data!"); + ChainedByteRangeHead cryptoDataRch(cryptoData); auto cryptoPacket1 = packetToBuf(createCryptoPacket( *serverChosenConnId, *originalConnId, handshakePacketNum++, QuicVersion::QUIC_V1, ProtectionType::Handshake, - *cryptoData, + cryptoDataRch, *createNoOpAead(), 0 /* largestAcked */)); deliverData(cryptoPacket1->coalesce()); @@ -5708,7 +5711,7 @@ TEST_F(QuicProcessDataTest, ProcessPendingData) { handshakePacketNum++, QuicVersion::QUIC_V1, ProtectionType::Handshake, - *cryptoData, + cryptoDataRch, *createNoOpAead(), 0, cryptoData->length())); @@ -5741,6 +5744,7 @@ TEST_F(QuicProcessDataTest, ProcessPendingDataBufferLimit) { *client->getConn().initialDestinationConnectionId)); mockClientHandshake->setServerTransportParams(std::move(*params)); auto serverHello = IOBuf::copyBuffer("Fake SHLO"); + ChainedByteRangeHead serverHelloRch(serverHello); PacketNum nextPacketNum = initialPacketNum++; auto& aead = getInitialCipher(); auto packet = createCryptoPacket( @@ -5749,7 +5753,7 @@ TEST_F(QuicProcessDataTest, ProcessPendingDataBufferLimit) { nextPacketNum, QuicVersion::QUIC_V1, ProtectionType::Initial, - *serverHello, + serverHelloRch, aead, 0 /* largestAcked */); auto packetData = packetToBufCleartext( @@ -5839,6 +5843,7 @@ TEST_P(QuicProcessDataTest, ProcessDataHeaderOnly) { auto qLogger = std::make_shared(VantagePoint::Client); client->getNonConstConn().qLogger = qLogger; auto serverHello = IOBuf::copyBuffer("Fake SHLO"); + ChainedByteRangeHead serverHelloRch(serverHello); PacketNum nextPacketNum = initialPacketNum++; auto& aead = getInitialCipher(); auto largestRecvdPacketNum = @@ -5850,7 +5855,7 @@ TEST_P(QuicProcessDataTest, ProcessDataHeaderOnly) { nextPacketNum, QuicVersion::QUIC_V1, ProtectionType::Initial, - *serverHello, + serverHelloRch, aead, 0 /* largestAcked */); diff --git a/quic/fizz/client/test/QuicClientTransportTestUtil.h b/quic/fizz/client/test/QuicClientTransportTestUtil.h index bd4812e2a..a4c8a9fe0 100644 --- a/quic/fizz/client/test/QuicClientTransportTestUtil.h +++ b/quic/fizz/client/test/QuicClientTransportTestUtil.h @@ -605,6 +605,7 @@ class QuicClientTransportTestBase : public virtual testing::Test { void recvServerHello(const folly::SocketAddress& addr) { auto serverHello = folly::IOBuf::copyBuffer("Fake SHLO"); + ChainedByteRangeHead serverHelloRch(serverHello); PacketNum nextPacketNum = initialPacketNum++; auto& aead = getInitialCipher(); auto packet = packetToBufCleartext( @@ -614,7 +615,7 @@ class QuicClientTransportTestBase : public virtual testing::Test { nextPacketNum, version, ProtectionType::Initial, - *serverHello, + serverHelloRch, aead, 0 /* largestAcked */), aead, @@ -654,13 +655,14 @@ class QuicClientTransportTestBase : public virtual testing::Test { void recvTicket(Optional offsetOverride = none) { auto negotiatedVersion = *client->getConn().version; auto ticket = folly::IOBuf::copyBuffer("NST"); + ChainedByteRangeHead ticketRch(ticket); auto packet = packetToBuf(createCryptoPacket( *serverChosenConnId, *originalConnId, appDataPacketNum++, negotiatedVersion, ProtectionType::KeyPhaseZero, - *ticket, + ticketRch, *createNoOpAead(), 0 /* largestAcked */, offsetOverride diff --git a/quic/flowcontrol/QuicFlowController.cpp b/quic/flowcontrol/QuicFlowController.cpp index 0954dab4e..1d0da3a83 100644 --- a/quic/flowcontrol/QuicFlowController.cpp +++ b/quic/flowcontrol/QuicFlowController.cpp @@ -247,13 +247,13 @@ void updateFlowControlOnResetStream(QuicStreamState& stream) { decrementWithOverFlowCheck( stream.conn.flowControlState.sumCurStreamBufferLen, static_cast( - stream.writeBuffer.chainLength() + stream.writeBufMeta.length)); + stream.pendingWrites.chainLength() + stream.writeBufMeta.length)); } void maybeWriteBlockAfterAPIWrite(QuicStreamState& stream) { // Only write blocked when stream becomes blocked if (getSendStreamFlowControlBytesWire(stream) == 0 && - stream.writeBuffer.empty() && stream.writeBufMeta.length == 0) { + stream.pendingWrites.empty() && stream.writeBufMeta.length == 0) { stream.conn.streamManager->queueBlocked( stream.id, stream.flowControlState.peerAdvertisedMaxOffset); if (stream.conn.qLogger) { @@ -290,7 +290,7 @@ void maybeWriteBlockAfterSocketWrite(QuicStreamState& stream) { } else { shouldEmitStreamBlockedFrame = getSendStreamFlowControlBytesWire(stream) == 0 && - (!stream.writeBuffer.empty() || stream.writeBufMeta.length > 0); + (!stream.pendingWrites.empty() || stream.writeBufMeta.length > 0); } if (shouldEmitStreamBlockedFrame && @@ -314,7 +314,7 @@ void handleStreamWindowUpdate( stream.flowControlState.peerAdvertisedMaxOffset = maximumData; stream.flowControlState.pendingBlockedFrame = false; if (stream.flowControlState.peerAdvertisedMaxOffset > - stream.currentWriteOffset + stream.writeBuffer.chainLength() + + stream.currentWriteOffset + stream.pendingWrites.chainLength() + stream.writeBufMeta.length) { updateFlowControlList(stream); } @@ -368,7 +368,7 @@ uint64_t getSendStreamFlowControlBytesWire(const QuicStreamState& stream) { uint64_t getSendStreamFlowControlBytesAPI(const QuicStreamState& stream) { auto sendFlowControlBytes = getSendStreamFlowControlBytesWire(stream); auto dataInBuffer = - stream.writeBuffer.chainLength() + stream.writeBufMeta.length; + stream.pendingWrites.chainLength() + stream.writeBufMeta.length; if (dataInBuffer > sendFlowControlBytes) { return 0; } else { diff --git a/quic/flowcontrol/test/QuicFlowControlTest.cpp b/quic/flowcontrol/test/QuicFlowControlTest.cpp index 35a0e320b..bdcb022bf 100644 --- a/quic/flowcontrol/test/QuicFlowControlTest.cpp +++ b/quic/flowcontrol/test/QuicFlowControlTest.cpp @@ -335,12 +335,15 @@ TEST_F(QuicFlowControlTest, MaybeWriteBlockedAfterAPIWrite) { EXPECT_FALSE(conn_.streamManager->hasBlocked()); stream.currentWriteOffset = 400; - stream.writeBuffer.append(IOBuf::copyBuffer("1234")); + auto dataBuf = IOBuf::copyBuffer("1234"); + stream.pendingWrites.append(dataBuf); + stream.writeBuffer.append(std::move(dataBuf)); EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(0); maybeWriteBlockAfterAPIWrite(stream); EXPECT_FALSE(conn_.streamManager->hasBlocked()); stream.writeBuffer.move(); + ChainedByteRangeHead(std::move(stream.pendingWrites)); stream.currentWriteOffset = 600; stream.flowControlState.peerAdvertisedMaxOffset = 600; EXPECT_CALL(*quicStats_, onStreamFlowControlBlocked()).Times(1); @@ -840,6 +843,7 @@ TEST_F(QuicFlowControlTest, WritableList) { // Fin writeDataToQuicStream(stream, nullptr, true); stream.writeBuffer.move(); + ChainedByteRangeHead(std::move(stream.pendingWrites)); stream.currentWriteOffset += 100; stream.flowControlState.peerAdvertisedMaxOffset = stream.currentWriteOffset; conn_.streamManager->updateWritableStreams(stream); @@ -865,6 +869,7 @@ TEST_F(QuicFlowControlTest, GetSendStreamFlowControlBytesAPIEmpty) { auto buf = IOBuf::create(200); buf->append(200); + stream.pendingWrites.append(buf); stream.writeBuffer.append(std::move(buf)); stream.flowControlState.peerAdvertisedMaxOffset = 300; @@ -878,6 +883,7 @@ TEST_F(QuicFlowControlTest, GetSendStreamFlowControlBytesAPIPartial) { auto buf = IOBuf::create(200); buf->append(200); + stream.pendingWrites.append(buf); stream.writeBuffer.append(std::move(buf)); stream.flowControlState.peerAdvertisedMaxOffset = 500; @@ -966,7 +972,9 @@ TEST_F(QuicFlowControlTest, StreamFlowControlWithBufMeta) { QuicStreamState stream(id, conn_); stream.flowControlState.peerAdvertisedMaxOffset = 1000; stream.currentWriteOffset = 200; - stream.writeBuffer.append(buildRandomInputData(100)); + auto inputData = buildRandomInputData(100); + stream.pendingWrites.append(inputData); + stream.writeBuffer.append(std::move(inputData)); EXPECT_EQ(800, getSendStreamFlowControlBytesWire(stream)); EXPECT_EQ(700, getSendStreamFlowControlBytesAPI(stream)); @@ -978,6 +986,7 @@ TEST_F(QuicFlowControlTest, StreamFlowControlWithBufMeta) { stream.currentWriteOffset += stream.writeBuffer.chainLength(); stream.writeBuffer.move(); + ChainedByteRangeHead(std::move(stream.pendingWrites)); EXPECT_EQ(700, getSendStreamFlowControlBytesWire(stream)); EXPECT_EQ(400, getSendStreamFlowControlBytesAPI(stream)); } diff --git a/quic/loss/test/QuicLossFunctionsTest.cpp b/quic/loss/test/QuicLossFunctionsTest.cpp index 8e78ceb1e..51bfb6dc6 100644 --- a/quic/loss/test/QuicLossFunctionsTest.cpp +++ b/quic/loss/test/QuicLossFunctionsTest.cpp @@ -511,8 +511,49 @@ TEST_F(QuicLossFunctionsTest, TestMarkPacketLoss) { auto& buffer = stream1->lossBuffer.front(); EXPECT_EQ(buffer.offset, 0); - IOBufEqualTo eq; - EXPECT_TRUE(eq(buf, buffer.data.move())); + EXPECT_EQ( + folly::ByteRange(buf->data(), buf->length()), + buffer.data.getHead()->getRange()); +} + +bool areEqual(folly::IOBuf* ptr, ChainedByteRangeHead* rch) { + if (ptr->computeChainDataLength() != rch->chainLength()) { + return false; + } + + auto* chainedByteRange = rch->getHead(); + + const uint8_t* currentIOBufDataPtr = ptr->data(); + const uint8_t* currentRchDataPtr = chainedByteRange->getRange().data(); + + uint32_t remainingLenIOBuf = ptr->length(); + uint32_t remainingLenRch = chainedByteRange->getRange().size(); + + for (uint32_t i = 0; i < rch->chainLength(); i++) { + while (remainingLenIOBuf == 0) { + ptr = ptr->next(); + remainingLenIOBuf = ptr->length(); + currentIOBufDataPtr = ptr->data(); + } + + while (remainingLenRch == 0) { + chainedByteRange = chainedByteRange->getNext(); + remainingLenRch = chainedByteRange->getRange().size(); + currentRchDataPtr = chainedByteRange->getRange().data(); + } + + if (*currentIOBufDataPtr != *currentRchDataPtr) { + return false; + } + + remainingLenIOBuf--; + remainingLenRch--; + + currentIOBufDataPtr++; + currentRchDataPtr++; + } + + return true; } TEST_F(QuicLossFunctionsTest, TestMarkPacketLossMerge) { @@ -568,8 +609,7 @@ TEST_F(QuicLossFunctionsTest, TestMarkPacketLossMerge) { combined->prependChain(buf2->clone()); auto& buffer = stream1->lossBuffer.front(); EXPECT_EQ(buffer.offset, 0); - IOBufEqualTo eq; - EXPECT_TRUE(eq(combined, buffer.data.move())); + EXPECT_TRUE(areEqual(combined.get(), &buffer.data)); } TEST_F(QuicLossFunctionsTest, TestMarkPacketLossNoMerge) { @@ -636,12 +676,15 @@ TEST_F(QuicLossFunctionsTest, TestMarkPacketLossNoMerge) { auto& buffer1 = stream1->lossBuffer[0]; EXPECT_EQ(buffer1.offset, 0); - IOBufEqualTo eq; - EXPECT_TRUE(eq(buf1, buffer1.data.move())); + EXPECT_EQ( + folly::ByteRange(buf1->data(), buf1->length()), + buffer1.data.getHead()->getRange()); auto& buffer3 = stream1->lossBuffer[1]; EXPECT_EQ(buffer3.offset, 40); - EXPECT_TRUE(eq(buf3, buffer3.data.move())); + EXPECT_EQ( + folly::ByteRange(buf3->data(), buf3->length()), + buffer3.data.getHead()->getRange()); } TEST_F(QuicLossFunctionsTest, RetxBufferSortedAfterLoss) { @@ -704,7 +747,7 @@ TEST_F(QuicLossFunctionsTest, TestMarkPacketLossAfterStreamReset) { EXPECT_TRUE(stream1->lossBuffer.empty()); EXPECT_TRUE(stream1->retransmissionBuffer.empty()); - EXPECT_TRUE(stream1->writeBuffer.empty()); + EXPECT_TRUE(stream1->pendingWrites.empty()); } TEST_F(QuicLossFunctionsTest, TestReorderingThreshold) { @@ -1178,7 +1221,7 @@ TEST_F(QuicLossFunctionsTest, PTOWithLostInitialData) { conn->oneRttWriteHeaderCipher = createNoOpHeaderCipher(); auto buf = buildRandomInputData(20); - StreamBuffer initialData(std::move(buf), 0); + WriteStreamBuffer initialData(ChainedByteRangeHead(buf), 0); conn->cryptoState->initialStream.lossBuffer.push_back(std::move(initialData)); ASSERT_TRUE(conn->outstandings.packets.empty()) @@ -1208,7 +1251,7 @@ TEST_F(QuicLossFunctionsTest, PTOWithLostHandshakeData) { conn->oneRttWriteHeaderCipher = createNoOpHeaderCipher(); auto buf = buildRandomInputData(20); - StreamBuffer handshakeData(std::move(buf), 0); + WriteStreamBuffer handshakeData(ChainedByteRangeHead(buf), 0); conn->cryptoState->handshakeStream.lossBuffer.push_back( std::move(handshakeData)); @@ -1238,7 +1281,7 @@ TEST_F(QuicLossFunctionsTest, PTOWithLostAppData) { conn->oneRttWriteHeaderCipher = createNoOpHeaderCipher(); auto buf = buildRandomInputData(20); - StreamBuffer appData(std::move(buf), 0); + WriteStreamBuffer appData(ChainedByteRangeHead(buf), 0); conn->cryptoState->oneRttStream.lossBuffer.push_back(std::move(appData)); ASSERT_TRUE(conn->outstandings.packets.empty()) @@ -2374,13 +2417,15 @@ TEST_F(QuicLossFunctionsTest, LossVisitorDSRTest) { false, 0 /* PacketNum */, PacketNumberSpace::AppData); - ASSERT_EQ(0, stream->writeBuffer.chainLength()); + ASSERT_EQ(0, stream->pendingWrites.chainLength()); auto retxIter = stream->retransmissionBuffer.find(0); ASSERT_NE(stream->retransmissionBuffer.end(), retxIter); ASSERT_EQ(0, retxIter->second->offset); ASSERT_FALSE(retxIter->second->eof); - ASSERT_TRUE(folly::IOBufEqualTo()( - *folly::IOBuf::copyBuffer("grape"), *retxIter->second->data.front())); + auto expectedBuf = folly::IOBuf::copyBuffer("grape"); + ASSERT_EQ( + folly::ByteRange(expectedBuf->buffer(), expectedBuf->length()), + retxIter->second->data.getHead()->getRange()); ASSERT_EQ(stream->currentWriteOffset, bufMetaStartingOffset); // Send BufMeta in 3 chunks: diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 49993e4dd..ad8ce3af1 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -1186,9 +1186,8 @@ QuicSocket::WriteResult QuicServerTransport::writeBufMeta( if (!stream->dsrSender) { return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); } - if (stream->currentWriteOffset == 0 && stream->writeBuffer.empty()) { - // If nothing has been written to writeBuffer ever, meta writing isn't - // allowed. + if (stream->currentWriteOffset == 0 && stream->pendingWrites.empty()) { + // If nothing has been written ever, meta writing isn't allowed. return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); } // Register DeliveryCallback for the data + eof offset. diff --git a/quic/server/test/QuicServerTransportTest.cpp b/quic/server/test/QuicServerTransportTest.cpp index 9097d4273..fa7bdb405 100644 --- a/quic/server/test/QuicServerTransportTest.cpp +++ b/quic/server/test/QuicServerTransportTest.cpp @@ -817,11 +817,13 @@ TEST_F(QuicServerTransportTest, RecvRstStreamFrame) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); + + auto wordsBuf2 = IOBuf::copyBuffer(words.at(2)); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); writeDataToQuicStream(*stream, IOBuf::copyBuffer(words.at(3)), false); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -875,11 +877,12 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrame) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); + auto wordsBuf2 = IOBuf::copyBuffer(words.at(2)); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -919,11 +922,12 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterCloseStream) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); + auto wordsBuf2 = IOBuf::copyBuffer(words.at(2)); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -964,11 +968,12 @@ TEST_F(QuicServerTransportTest, RecvInvalidMaxStreamData) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); + auto wordsBuf2 = IOBuf::copyBuffer(words.at(2)); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1006,11 +1011,12 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterHalfCloseRemote) { stream->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); + auto wordsBuf2 = IOBuf::copyBuffer(words.at(2)); stream->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); stream->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1087,11 +1093,12 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterReset) { stream1->readBuffer.emplace_back(IOBuf::copyBuffer(words.at(0)), 0, false); stream1->readBuffer.emplace_back( IOBuf::copyBuffer(words.at(1)), words.at(0).length(), false); + auto wordsBuf2 = IOBuf::copyBuffer(words.at(2)); stream1->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); stream1->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream1->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream1->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -1102,8 +1109,8 @@ TEST_F(QuicServerTransportTest, RecvStopSendingFrameAfterReset) { stream2->retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(0), - std::forward_as_tuple(std::make_unique( - IOBuf::copyBuffer(words.at(2)), 0, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(wordsBuf2), 0, false))); stream2->writeBuffer.append(IOBuf::copyBuffer(words.at(3))); stream2->currentWriteOffset = words.at(2).length() + words.at(3).length(); stream2->currentReadOffset = words.at(0).length() + words.at(1).length(); @@ -3570,13 +3577,15 @@ TEST_F(QuicUnencryptedServerTransportTest, TestBadCleartextEncryption) { PacketNum nextPacket = clientNextInitialPacketNum++; auto aead = cryptoFactory.getServerInitialCipher( *clientConnectionId, QuicVersion::MVFST); + auto chloBuf = IOBuf::copyBuffer("CHLO"); + ChainedByteRangeHead chloRch(chloBuf); auto packetData = packetToBufCleartext( createInitialCryptoPacket( *clientConnectionId, *initialDestinationConnectionId, nextPacket, QuicVersion::MVFST, - *IOBuf::copyBuffer("CHLO"), + chloRch, *aead, 0 /* largestAcked */), *aead, @@ -4214,13 +4223,14 @@ TEST_F(QuicUnencryptedServerTransportTest, TestCorruptedDstCidInitialTest) { EXPECT_TRUE(clientConnectionId); EXPECT_NE(*initialDestinationConnectionId, corruptedDstCid); + ChainedByteRangeHead chloRch(chlo); auto initialPacket = packetToBufCleartext( createInitialCryptoPacket( *clientConnectionId, corruptedDstCid, nextPacketNum, QuicVersion::MVFST, - *chlo, + chloRch, *aead, 0 /* largestAcked */), *aead, @@ -4489,13 +4499,15 @@ TEST_F(QuicUnencryptedServerTransportTest, TestGarbageData) { PacketNum nextPacket = clientNextInitialPacketNum++; auto aead = getInitialCipher(); auto headerCipher = getInitialHeaderCipher(); + auto chloBuf = IOBuf::copyBuffer("CHLO"); + ChainedByteRangeHead chloRch(chloBuf); auto packet = createCryptoPacket( *clientConnectionId, *initialDestinationConnectionId, nextPacket, QuicVersion::MVFST, ProtectionType::Initial, - *IOBuf::copyBuffer("CHLO"), + chloRch, *aead, 0 /* largestAcked */); auto packetData = diff --git a/quic/server/test/QuicServerTransportTestUtil.h b/quic/server/test/QuicServerTransportTestUtil.h index 2eafd379f..5fcfb8ec8 100644 --- a/quic/server/test/QuicServerTransportTestUtil.h +++ b/quic/server/test/QuicServerTransportTestUtil.h @@ -297,13 +297,14 @@ class QuicServerTransportTestBase : public virtual testing::Test { auto nextPacketNum = clientNextInitialPacketNum++; auto aead = getInitialCipher(version); auto headerCipher = getInitialHeaderCipher(version); + ChainedByteRangeHead chloRch(chlo); auto initialPacket = packetToBufCleartext( createInitialCryptoPacket( *clientConnectionId, *initialDestinationConnectionId, nextPacketNum, version, - *chlo, + chloRch, *aead, 0 /* largestAcked */), *aead, @@ -324,6 +325,7 @@ class QuicServerTransportTestBase : public virtual testing::Test { *server->getConn().cryptoState, EncryptionLevel::Handshake) ->currentReadOffset; auto handshakeCipher = test::createNoOpAead(); + ChainedByteRangeHead finishedRch(finished); auto finishedPacket = packetToBufCleartext( createCryptoPacket( *clientConnectionId, @@ -331,7 +333,7 @@ class QuicServerTransportTestBase : public virtual testing::Test { nextPacketNum, version, ProtectionType::Handshake, - *finished, + finishedRch, *handshakeCipher, 0 /* largestAcked */, offset), diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index 9f552a684..449966739 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -44,10 +44,10 @@ void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof) { // write a blocked frame first time the stream becomes blocked maybeWriteBlockAfterAPIWrite(stream); } + stream.pendingWrites.append(data); stream.writeBuffer.append(std::move(data)); if (eof) { - auto bufferSize = - stream.writeBuffer.front() ? stream.writeBuffer.chainLength() : 0; + auto bufferSize = stream.pendingWrites.chainLength(); stream.finalWriteOffset = stream.currentWriteOffset + bufferSize; } updateFlowControlOnWriteToStream(stream, len); @@ -62,7 +62,7 @@ void writeBufMetaToQuicStream( maybeWriteBlockAfterAPIWrite(stream); } auto realDataLength = - stream.currentWriteOffset + stream.writeBuffer.chainLength(); + stream.currentWriteOffset + stream.pendingWrites.chainLength(); CHECK_GT(realDataLength, 0) << "Real data has to be written to a stream before any buffer meta is" << "written to it."; @@ -83,6 +83,7 @@ void writeBufMetaToQuicStream( } void writeDataToQuicStream(QuicCryptoStream& stream, Buf data) { + stream.pendingWrites.append(data); stream.writeBuffer.append(std::move(data)); } @@ -390,7 +391,7 @@ bool allBytesTillFinAcked(const QuicStreamState& stream) { * 5. We have no bytes that are detected as lost. */ return stream.hasSentFIN() && stream.retransmissionBuffer.empty() && - stream.retransmissionBufMetas.empty() && stream.writeBuffer.empty() && + stream.retransmissionBufMetas.empty() && stream.pendingWrites.empty() && !stream.hasWritableBufMeta() && stream.lossBuffer.empty() && stream.lossBufMetas.empty(); } @@ -409,7 +410,7 @@ void appendPendingStreamReset( * we risk using a value > peer's flow control limit. */ bool writeBufWritten = stream.writeBufMeta.offset && - (stream.currentWriteOffset + stream.writeBuffer.chainLength() != + (stream.currentWriteOffset + stream.pendingWrites.chainLength() != stream.writeBufMeta.offset); conn.pendingEvents.resets.emplace( std::piecewise_construct, @@ -426,7 +427,7 @@ void appendPendingStreamReset( uint64_t getLargestWriteOffsetSeen(const QuicStreamState& stream) { return stream.finalWriteOffset.value_or(std::max( - stream.currentWriteOffset + stream.writeBuffer.chainLength(), + stream.currentWriteOffset + stream.pendingWrites.chainLength(), stream.writeBufMeta.offset + stream.writeBufMeta.length)); } diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 93b0021c8..31db26618 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -93,6 +93,28 @@ struct StreamBuffer { StreamBuffer& operator=(StreamBuffer&& other) = default; }; +struct WriteStreamBuffer { + ChainedByteRangeHead data; + uint64_t offset; + bool eof{false}; + + WriteStreamBuffer( + ChainedByteRangeHead&& dataIn, + uint64_t offsetIn, + bool eofIn = false) noexcept + : data(std::move(dataIn)), offset(offsetIn), eof(eofIn) {} + + WriteStreamBuffer(WriteStreamBuffer&& other) + : data(std::move(other.data)), offset(other.offset), eof(other.eof) {} + + WriteStreamBuffer& operator=(WriteStreamBuffer&& other) noexcept { + data = std::move(other.data); + offset = other.offset; + eof = other.eof; + return *this; + } +}; + struct QuicStreamLike { QuicStreamLike() = default; @@ -105,13 +127,15 @@ struct QuicStreamLike { CircularDeque readBuffer; // List of bytes that have been written to the QUIC layer. + uint64_t writeBufferStartOffset{0}; BufQueue writeBuffer{}; + ChainedByteRangeHead pendingWrites{}; // Stores a map of offset:buffers which have been written to the socket and // are currently un-acked. Each one represents one StreamFrame that was // written. We need to buffer these because these might be retransmitted in // the future. These are associated with the starting offset of the buffer. - folly::F14FastMap> + folly::F14FastMap> retransmissionBuffer; // Tracks intervals which we have received ACKs for. E.g. in the case of all @@ -125,10 +149,10 @@ struct QuicStreamLike { // Stores a list of buffers which have been marked as loss by loss detector. // Each one represents one StreamFrame that was written. - CircularDeque lossBuffer; + CircularDeque lossBuffer; - // Current offset of the start bytes in the write buffer. - // This changes when we pop stuff off the writeBuffer. + // Current offset of the start bytes in the pending writes chain. + // This changes when we pop stuff off the pendingWrites chain. // In a non-DSR stream, when we are finished writing out all the bytes until // FIN, this will be one greater than finalWriteOffset. // When DSR is used, this still points to the starting bytes in the write @@ -179,7 +203,7 @@ struct QuicStreamLike { * Either insert a new entry into the loss buffer, or merge the buffer with * an existing entry. */ - void insertIntoLossBuffer(std::unique_ptr buf) { + void insertIntoLossBuffer(std::unique_ptr buf) { // We assume here that we won't try to insert an overlapping buffer, as // that should never happen in the loss buffer. auto lossItr = std::upper_bound( @@ -190,10 +214,10 @@ struct QuicStreamLike { if (!lossBuffer.empty() && lossItr != lossBuffer.begin() && std::prev(lossItr)->offset + std::prev(lossItr)->data.chainLength() == buf->offset) { - std::prev(lossItr)->data.append(buf->data.move()); + std::prev(lossItr)->data.append(std::move(buf->data)); std::prev(lossItr)->eof = buf->eof; } else { - lossBuffer.insert(lossItr, std::move(*buf)); + lossBuffer.emplace(lossItr, std::move(*buf)); } } @@ -225,18 +249,17 @@ struct QuicStreamLike { size_t amountToSplit = removedStartOffset > lossStartOffset ? static_cast(removedStartOffset - lossStartOffset) : 0; - Buf splitBuf = nullptr; + ChainedByteRangeHead splitBuf; if (amountToSplit > 0) { splitBuf = lossItr->data.splitAtMost(amountToSplit); - CHECK(splitBuf); lossItr->offset += amountToSplit; } lossItr->offset += lossItr->data.trimStartAtMost(len); if (lossItr->data.empty() && lossItr->eof == eof) { lossBuffer.erase(lossItr); } - if (splitBuf) { - insertIntoLossBuffer(std::make_unique( + if (!splitBuf.empty()) { + insertIntoLossBuffer(std::make_unique( std::move(splitBuf), lossStartOffset, false)); } return; @@ -400,9 +423,9 @@ struct QuicStreamState : public QuicStreamLike { // If the stream has writable data that's not backed by DSR. That is, in a // regular stream write, it will be able to write something. So it either - // needs to have writeBuffer, or it has EOF to send. + // needs to have data in the pendingWrites chain, or it has EOF to send. bool hasWritableData() const { - if (!writeBuffer.empty()) { + if (!pendingWrites.empty()) { CHECK_GE(flowControlState.peerAdvertisedMaxOffset, currentWriteOffset); return flowControlState.peerAdvertisedMaxOffset - currentWriteOffset > 0; } @@ -458,7 +481,7 @@ struct QuicStreamState : public QuicStreamLike { if (writeBufMeta.offset == 0) { return currentWriteOffset; } - if (!writeBuffer.empty()) { + if (!pendingWrites.empty()) { return currentWriteOffset; } return writeBufMeta.offset; @@ -478,7 +501,7 @@ struct QuicStreamState : public QuicStreamLike { // BufferMeta that has been written to the QUIC layer. // When offset is 0, nothing has been written to it. On first write, its - // starting offset will be currentWriteOffset + writeBuffer.chainLength(). + // starting offset will be currentWriteOffset + pendingWrites.chainLength(). WriteBufferMeta writeBufMeta; // A map to store sent WriteBufferMetas for potential retransmission. diff --git a/quic/state/stream/StreamSendHandlers.cpp b/quic/state/stream/StreamSendHandlers.cpp index 80da5b2e6..f5902d398 100644 --- a/quic/state/stream/StreamSendHandlers.cpp +++ b/quic/state/stream/StreamSendHandlers.cpp @@ -153,7 +153,7 @@ void sendAckSMHandler( case StreamSendState::Closed: case StreamSendState::ResetSent: { DCHECK(stream.retransmissionBuffer.empty()); - DCHECK(stream.writeBuffer.empty()); + DCHECK(stream.pendingWrites.empty()); break; } case StreamSendState::Invalid: { diff --git a/quic/state/stream/StreamStateFunctions.cpp b/quic/state/stream/StreamStateFunctions.cpp index 7c7bc9bb4..bb904c70b 100644 --- a/quic/state/stream/StreamStateFunctions.cpp +++ b/quic/state/stream/StreamStateFunctions.cpp @@ -15,6 +15,7 @@ void resetQuicStream(QuicStreamState& stream, ApplicationErrorCode error) { updateFlowControlOnResetStream(stream); stream.retransmissionBuffer.clear(); stream.writeBuffer.move(); + ChainedByteRangeHead(std::move(stream.pendingWrites)); // Will be destructed stream.readBuffer.clear(); stream.lossBuffer.clear(); stream.streamWriteError = error; diff --git a/quic/state/stream/test/StreamStateFunctionsTest.cpp b/quic/state/stream/test/StreamStateFunctionsTest.cpp index e5e423c3b..6083e2d5b 100644 --- a/quic/state/stream/test/StreamStateFunctionsTest.cpp +++ b/quic/state/stream/test/StreamStateFunctionsTest.cpp @@ -36,10 +36,12 @@ TEST_F(StreamStateFunctionsTests, BasicResetTest) { StreamBuffer(folly::IOBuf::copyBuffer(" It is not a hotdog."), 15)); writeDataToQuicStream( stream, folly::IOBuf::copyBuffer("What is it then?"), false); + + std::string retxBufData = "How would I know?"; + Buf retxBuf = folly::IOBuf::copyBuffer(retxBufData); stream.retransmissionBuffer.emplace( 34, - std::make_unique( - folly::IOBuf::copyBuffer("How would I know?"), 34)); + std::make_unique(ChainedByteRangeHead(retxBuf), 34)); auto currentWriteOffset = stream.currentWriteOffset; auto currentReadOffset = stream.currentReadOffset; EXPECT_TRUE(stream.writable()); diff --git a/quic/state/stream/test/StreamStateMachineTest.cpp b/quic/state/stream/test/StreamStateMachineTest.cpp index 8708bb127..f0f994fa6 100644 --- a/quic/state/stream/test/StreamStateMachineTest.cpp +++ b/quic/state/stream/test/StreamStateMachineTest.cpp @@ -802,8 +802,8 @@ TEST_F(QuicUnidirectionalStreamTest, OpenFinalAckStreamFrame) { stream.retransmissionBuffer.emplace( std::piecewise_construct, std::forward_as_tuple(1), - std::forward_as_tuple( - std::make_unique(std::move(buf), 1, false))); + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf), 1, false))); sendAckSMHandler(stream, streamFrame); EXPECT_EQ(stream.sendState, StreamSendState::Closed); EXPECT_EQ(stream.recvState, StreamRecvState::Invalid); @@ -883,7 +883,7 @@ TEST_F(QuicOpenStateTest, DSRFullStreamAcked) { true, 1, PacketNumberSpace::AppData); - ASSERT_EQ(stream->writeBuffer.chainLength(), 0); + ASSERT_EQ(stream->pendingWrites.chainLength(), 0); ASSERT_NE( stream->retransmissionBufMetas.end(), stream->retransmissionBufMetas.find(bufMetaStartingOffset)); diff --git a/quic/state/test/AckHandlersTest.cpp b/quic/state/test/AckHandlersTest.cpp index d4f294d50..59da225f6 100644 --- a/quic/state/test/AckHandlersTest.cpp +++ b/quic/state/test/AckHandlersTest.cpp @@ -318,12 +318,13 @@ TEST_P(AckHandlersTest, TestSpuriousLossFullRemoval) { StreamId streamId = 1; auto streamState = conn.streamManager->createStream(streamId).value(); - BufQueue data{}; + ChainedByteRangeHead data; auto iob = folly::IOBuf::createChain(200, 200); iob->append(200); - data.append(std::move(iob)); + data.append(iob); ASSERT_EQ(data.chainLength(), 200); - auto streamBuffer = std::make_unique(data.move(), 0, false); + auto streamBuffer = + std::make_unique(std::move(data), 0, false); streamState->insertIntoLossBuffer(std::move(streamBuffer)); TimePoint startTime = Clock::now(); @@ -397,12 +398,13 @@ TEST_P(AckHandlersTest, TestSpuriousLossSplitMiddleRemoval) { StreamId streamId = 1; auto streamState = conn.streamManager->createStream(streamId).value(); - BufQueue data{}; + ChainedByteRangeHead data; auto iob = folly::IOBuf::createChain(200, 200); iob->append(200); - data.append(std::move(iob)); + data.append(iob); ASSERT_EQ(data.chainLength(), 200); - auto streamBuffer = std::make_unique(data.move(), 0, false); + auto streamBuffer = + std::make_unique(std::move(data), 0, false); streamState->insertIntoLossBuffer(std::move(streamBuffer)); TimePoint startTime = Clock::now(); @@ -482,12 +484,13 @@ TEST_P(AckHandlersTest, TestSpuriousLossTrimFrontRemoval) { StreamId streamId = 1; auto streamState = conn.streamManager->createStream(streamId).value(); - BufQueue data{}; + ChainedByteRangeHead data; auto iob = folly::IOBuf::createChain(200, 200); iob->append(200); - data.append(std::move(iob)); + data.append(iob); ASSERT_EQ(data.chainLength(), 200); - auto streamBuffer = std::make_unique(data.move(), 0, false); + auto streamBuffer = + std::make_unique(std::move(data), 0, false); streamState->insertIntoLossBuffer(std::move(streamBuffer)); TimePoint startTime = Clock::now(); @@ -564,12 +567,13 @@ TEST_P(AckHandlersTest, TestSpuriousLossSplitFrontRemoval) { StreamId streamId = 1; auto streamState = conn.streamManager->createStream(streamId).value(); - BufQueue data{}; + ChainedByteRangeHead data; auto iob = folly::IOBuf::createChain(200, 200); iob->append(200); - data.append(std::move(iob)); + data.append(iob); ASSERT_EQ(data.chainLength(), 200); - auto streamBuffer = std::make_unique(data.move(), 0, false); + auto streamBuffer = + std::make_unique(std::move(data), 0, false); streamState->insertIntoLossBuffer(std::move(streamBuffer)); TimePoint startTime = Clock::now(); diff --git a/quic/state/test/QuicStreamFunctionsTest.cpp b/quic/state/test/QuicStreamFunctionsTest.cpp index c70966656..dd0177581 100644 --- a/quic/state/test/QuicStreamFunctionsTest.cpp +++ b/quic/state/test/QuicStreamFunctionsTest.cpp @@ -1991,7 +1991,8 @@ TEST_F(QuicStreamFunctionsTest, AllBytesTillFinAckedStillLost) { QuicStreamState stream(id, conn); stream.finalWriteOffset = 20; stream.currentWriteOffset = 21; - stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); + auto dataBuf = IOBuf::create(10); + stream.lossBuffer.emplace_back(ChainedByteRangeHead(dataBuf), 10, false); EXPECT_FALSE(allBytesTillFinAcked(stream)); } @@ -2012,8 +2013,12 @@ TEST_F(QuicStreamFunctionsTest, AllBytesTillFinAckedStillRetransmitting) { StreamId id = 3; QuicStreamState stream(id, conn); stream.finalWriteOffset = 12; + + auto retxBufData = IOBuf::create(10); stream.retransmissionBuffer.emplace( - 0, std::make_unique(IOBuf::create(10), 10, false)); + 0, + std::make_unique( + ChainedByteRangeHead(retxBufData), 10, false)); EXPECT_FALSE(allBytesTillFinAcked(stream)); } @@ -2092,7 +2097,9 @@ TEST_F(QuicStreamFunctionsTest, LargestWriteOffsetSeenFIN) { TEST_F(QuicStreamFunctionsTest, LargestWriteOffsetSeenNoFIN) { QuicStreamState stream(3, conn); stream.currentWriteOffset = 100; - stream.writeBuffer.append(buildRandomInputData(20)); + auto randomInputData = buildRandomInputData(20); + stream.pendingWrites.append(randomInputData); + stream.writeBuffer.append(std::move(randomInputData)); EXPECT_EQ(120, getLargestWriteOffsetSeen(stream)); } @@ -2208,7 +2215,8 @@ TEST_F(QuicStreamFunctionsTest, LossBufferEmptyNoChange) { TEST_F(QuicStreamFunctionsTest, LossBufferHasData) { StreamId id = 4; QuicStreamState stream(id, conn); - stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); + auto dataBuf = IOBuf::create(10); + stream.lossBuffer.emplace_back(ChainedByteRangeHead(dataBuf), 10, false); conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2242,7 +2250,8 @@ TEST_F(QuicStreamFunctionsTest, LossBufferStillHasData) { StreamId id = 4; QuicStreamState stream(id, conn); conn.streamManager->addLoss(id); - stream.lossBuffer.emplace_back(IOBuf::create(10), 10, false); + auto dataBuf = IOBuf::create(10); + stream.lossBuffer.emplace_back(ChainedByteRangeHead(dataBuf), 10, false); conn.streamManager->updateWritableStreams(stream); EXPECT_TRUE(conn.streamManager->hasLoss()); } @@ -2270,6 +2279,7 @@ TEST_F(QuicStreamFunctionsTest, WritableList) { // Fin writeDataToQuicStream(stream, nullptr, true); stream.writeBuffer.move(); + ChainedByteRangeHead(std::move(stream.pendingWrites)); stream.currentWriteOffset += 100; stream.flowControlState.peerAdvertisedMaxOffset = stream.currentWriteOffset; conn.streamManager->updateWritableStreams(stream); @@ -2284,7 +2294,7 @@ TEST_F(QuicStreamFunctionsTest, WritableList) { TEST_F(QuicStreamFunctionsTest, AckCryptoStream) { auto chlo = IOBuf::copyBuffer("CHLO"); conn.cryptoState->handshakeStream.retransmissionBuffer.emplace( - 0, std::make_unique(chlo->clone(), 0)); + 0, std::make_unique(ChainedByteRangeHead(chlo), 0)); processCryptoStreamAck(conn.cryptoState->handshakeStream, 0, chlo->length()); EXPECT_EQ(conn.cryptoState->handshakeStream.retransmissionBuffer.size(), 0); } @@ -2293,7 +2303,7 @@ TEST_F(QuicStreamFunctionsTest, AckCryptoStreamOffsetLengthMismatch) { auto chlo = IOBuf::copyBuffer("CHLO"); auto& cryptoStream = conn.cryptoState->handshakeStream; cryptoStream.retransmissionBuffer.emplace( - 0, std::make_unique(chlo->clone(), 0)); + 0, std::make_unique(ChainedByteRangeHead(chlo), 0)); processCryptoStreamAck(cryptoStream, 1, chlo->length()); EXPECT_EQ(cryptoStream.retransmissionBuffer.size(), 1);