diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 32cce38ad..41b24dc07 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -363,17 +363,23 @@ DataPathResult iobufChainBasedBuildScheduleEncrypt( namespace quic { -void handleNewStreamDataWritten( - QuicConnectionStateBase& conn, +void handleNewStreamBufMetaWritten( QuicStreamLike& stream, uint64_t frameLen, + bool frameFin); + +void handleRetransmissionBufMetaWritten( + QuicStreamLike& stream, + uint64_t frameOffset, + uint64_t frameLen, bool frameFin, - PacketNum packetNum, - PacketNumberSpace packetNumberSpace) { + const decltype(stream.lossBufMetas)::iterator lossBufMetaIter); + +void handleNewStreamDataWritten( + QuicStreamLike& stream, + uint64_t frameLen, + bool frameFin) { auto originalOffset = stream.currentWriteOffset; - VLOG(10) << nodeToString(conn.nodeType) << " sent" - << " packetNum=" << packetNum << " space=" << packetNumberSpace - << " " << conn; // Idealy we should also check this data doesn't exist in either retx buffer // or loss buffer, but that's an expensive search. stream.currentWriteOffset += frameLen; @@ -389,17 +395,33 @@ void handleNewStreamDataWritten( .second); } +void handleNewStreamBufMetaWritten( + QuicStreamLike& stream, + uint64_t frameLen, + bool frameFin) { + CHECK_GT(stream.writeBufMeta.offset, 0); + auto originalOffset = stream.writeBufMeta.offset; + auto bufMetaSplit = stream.writeBufMeta.split(frameLen); + CHECK_EQ(bufMetaSplit.offset, originalOffset); + if (frameFin) { + // If FIN is written, nothing should be left in the writeBufMeta. + CHECK_EQ(0, stream.writeBufMeta.length); + ++stream.writeBufMeta.offset; + } + CHECK(stream.retransmissionBufMetas + .emplace( + std::piecewise_construct, + std::forward_as_tuple(originalOffset), + std::forward_as_tuple(bufMetaSplit)) + .second); +} + void handleRetransmissionWritten( - QuicConnectionStateBase& conn, QuicStreamLike& stream, uint64_t frameOffset, uint64_t frameLen, bool frameFin, - PacketNum packetNum, std::deque::iterator lossBufferIter) { - conn.lossState.totalBytesRetransmitted += frameLen; - VLOG(10) << nodeToString(conn.nodeType) << " sent retransmission" - << " packetNum=" << packetNum << " " << conn; auto bufferLen = lossBufferIter->data.chainLength(); Buf bufWritten; if (frameLen == bufferLen && frameFin == lossBufferIter->eof) { @@ -419,6 +441,31 @@ void handleRetransmissionWritten( .second); } +void handleRetransmissionBufMetaWritten( + QuicStreamLike& stream, + uint64_t frameOffset, + uint64_t frameLen, + bool frameFin, + const decltype(stream.lossBufMetas)::iterator lossBufMetaIter) { + if (frameLen == lossBufMetaIter->length && frameFin == lossBufMetaIter->eof) { + stream.lossBufMetas.erase(lossBufMetaIter); + } else { + CHECK_GT(lossBufMetaIter->length, frameLen); + lossBufMetaIter->length -= frameLen; + lossBufMetaIter->offset += frameLen; + } + CHECK(stream.retransmissionBufMetas + .emplace( + std::piecewise_construct, + std::forward_as_tuple(frameOffset), + std::forward_as_tuple(WriteBufferMeta::Builder() + .setOffset(frameOffset) + .setLength(frameLen) + .setEOF(frameFin) + .build())) + .second); +} + /** * Update the connection and stream state after stream data is written and deal * with new data, as well as retranmissions. Returns true if the data sent is @@ -431,34 +478,66 @@ bool handleStreamWritten( uint64_t frameLen, bool frameFin, PacketNum packetNum, - PacketNumberSpace packetNumberSpace) { + PacketNumberSpace packetNumberSpace, + bool fromBufMeta) { + auto writtenNewData = false; // Handle new data first - if (frameOffset == stream.currentWriteOffset) { + if (!fromBufMeta && frameOffset == stream.currentWriteOffset) { + handleNewStreamDataWritten(stream, frameLen, frameFin); + writtenNewData = true; + } + + if (fromBufMeta && stream.writeBufMeta.offset > 0 && + frameOffset == stream.writeBufMeta.offset) { + handleNewStreamBufMetaWritten(stream, frameLen, frameFin); + writtenNewData = true; + } + + if (writtenNewData) { // Count packet. It's based on the assumption that schedluing scheme will // only writes one STREAM frame for a stream in a packet. If that doesn't // hold, we need to avoid double-counting. - stream.numPacketsTxWithNewData++; - handleNewStreamDataWritten( - conn, stream, frameLen, frameFin, packetNum, packetNumberSpace); + ++stream.numPacketsTxWithNewData; + VLOG(10) << nodeToString(conn.nodeType) << " sent" + << " packetNum=" << packetNum << " space=" << packetNumberSpace + << " " << conn; return true; } - // If the data is in the loss buffer, it is a retransmission. - auto lossBufferIter = std::lower_bound( - stream.lossBuffer.begin(), - stream.lossBuffer.end(), - frameOffset, - [](const auto& buf, auto off) { return buf.offset < off; }); - if (lossBufferIter != stream.lossBuffer.end() && - lossBufferIter->offset == frameOffset) { - handleRetransmissionWritten( - conn, - stream, + bool writtenRetx = false; + if (!fromBufMeta) { + // If the data is in the loss buffer, it is a retransmission. + auto lossBufferIter = std::lower_bound( + stream.lossBuffer.begin(), + stream.lossBuffer.end(), frameOffset, - frameLen, - frameFin, - packetNum, - lossBufferIter); + [](const auto& buf, auto off) { return buf.offset < off; }); + if (lossBufferIter != stream.lossBuffer.end() && + lossBufferIter->offset == frameOffset) { + handleRetransmissionWritten( + stream, frameOffset, frameLen, frameFin, lossBufferIter); + writtenRetx = true; + } + } else { + auto lossBufMetaIter = std::lower_bound( + stream.lossBufMetas.begin(), + stream.lossBufMetas.end(), + frameOffset, + [](const auto& bufMeta, auto offset) { + return bufMeta.offset < offset; + }); + if (lossBufMetaIter != stream.lossBufMetas.end() && + lossBufMetaIter->offset == frameOffset) { + handleRetransmissionBufMetaWritten( + stream, frameOffset, frameLen, frameFin, lossBufMetaIter); + writtenRetx = true; + } + } + + if (writtenRetx) { + conn.lossState.totalBytesRetransmitted += frameLen; + VLOG(10) << nodeToString(conn.nodeType) << " sent retransmission" + << " packetNum=" << packetNum << " " << conn; QUIC_STATS(conn.statsCallback, onPacketRetransmission); return false; } @@ -504,7 +583,8 @@ void updateConnection( writeStreamFrame.len, writeStreamFrame.fin, packetNum, - packetNumberSpace); + packetNumberSpace, + writeStreamFrame.fromBufMeta); if (newStreamDataWritten) { updateFlowControlOnWriteToSocket(*stream, writeStreamFrame.len); maybeWriteBlockAfterSocketWrite(*stream); @@ -530,9 +610,10 @@ void updateConnection( *getCryptoStream(*conn.cryptoState, encryptionLevel), writeCryptoFrame.offset, writeCryptoFrame.len, - false, + false /* fin */, packetNum, - packetNumberSpace); + packetNumberSpace, + false /* fromBufMeta */); break; } case QuicWriteFrame::Type::WriteAckFrame: { diff --git a/quic/api/QuicTransportFunctions.h b/quic/api/QuicTransportFunctions.h index a254d9dbf..fb3d253ed 100644 --- a/quic/api/QuicTransportFunctions.h +++ b/quic/api/QuicTransportFunctions.h @@ -146,23 +146,18 @@ WriteDataReason hasNonAckDataToWrite(const QuicConnectionStateBase& conn); * Invoked when the written stream data was new stream data. */ void handleNewStreamDataWritten( - QuicConnectionStateBase& conn, QuicStreamLike& stream, uint64_t frameLen, - bool frameFin, - PacketNum packetNum, - PacketNumberSpace packetNumberSpace); + bool frameFin); /** * Invoked when the stream data written was retransmitted data. */ void handleRetransmissionWritten( - QuicConnectionStateBase& conn, QuicStreamLike& stream, uint64_t frameOffset, uint64_t frameLen, bool frameFin, - PacketNum packetNum, std::deque::iterator lossBufferIter); /** @@ -177,7 +172,8 @@ bool handleStreamWritten( uint64_t frameLen, bool frameFin, PacketNum packetNum, - PacketNumberSpace packetNumberSpace); + PacketNumberSpace packetNumberSpace, + bool fromBufMeta); /** * Update the connection state after sending a new packet. diff --git a/quic/api/test/QuicTransportFunctionsTest.cpp b/quic/api/test/QuicTransportFunctionsTest.cpp index 5cf20d758..4b258b28b 100644 --- a/quic/api/test/QuicTransportFunctionsTest.cpp +++ b/quic/api/test/QuicTransportFunctionsTest.cpp @@ -2612,5 +2612,62 @@ TEST_F(QuicTransportFunctionsTest, WriteD6DProbesWithInplaceBuilder) { EXPECT_EQ(0, bufPtr->headroom()); } +TEST_F(QuicTransportFunctionsTest, UpdateConnectionWithBufferMeta) { + auto conn = createConn(); + // Builds a fake packet to test with. + auto packet = buildEmptyPacket(*conn, PacketNumberSpace::AppData); + + auto streamId = + conn->streamManager->createNextBidirectionalStream().value()->id; + auto stream = conn->streamManager->findStream(streamId); + EXPECT_TRUE(stream->retransmissionBufMetas.empty()); + writeDataToQuicStream( + *stream, IOBuf::copyBuffer("Wear a face mask please!"), false /* eof */); + BufferMeta bufMeta(2000); + writeBufMetaToQuicStream(*stream, bufMeta, true /* eof */); + EXPECT_TRUE(stream->writeBufMeta.eof); + EXPECT_EQ(2000, stream->writeBufMeta.length); + auto bufMetaStartingOffset = stream->writeBufMeta.offset; + WriteStreamFrame writeStreamFrame( + streamId, bufMetaStartingOffset, 1000, false /* fin */); + writeStreamFrame.fromBufMeta = true; + packet.packet.frames.push_back(writeStreamFrame); + + updateConnection( + *conn, folly::none, packet.packet, TimePoint(), getEncodedSize(packet)); + EXPECT_EQ(1000 + bufMetaStartingOffset, stream->writeBufMeta.offset); + EXPECT_EQ(1000, stream->writeBufMeta.length); + EXPECT_FALSE(stream->retransmissionBufMetas.empty()); + auto retxBufMetaIter = + stream->retransmissionBufMetas.find(bufMetaStartingOffset); + EXPECT_NE(retxBufMetaIter, stream->retransmissionBufMetas.end()); + EXPECT_EQ(bufMetaStartingOffset, retxBufMetaIter->second.offset); + EXPECT_EQ(1000, retxBufMetaIter->second.length); + EXPECT_FALSE(retxBufMetaIter->second.eof); + + // Manually lose this packet: + stream->lossBufMetas.push_back(retxBufMetaIter->second); + stream->retransmissionBufMetas.erase(retxBufMetaIter); + ASSERT_FALSE(stream->lossBufMetas.empty()); + ASSERT_TRUE(stream->retransmissionBufMetas.empty()); + + // Retransmit it: + auto retxPacket = buildEmptyPacket(*conn, PacketNumberSpace::AppData); + // Retx of the stream looks exactly the same + retxPacket.packet.frames.push_back(writeStreamFrame); + updateConnection( + *conn, + folly::none, + retxPacket.packet, + TimePoint(), + getEncodedSize(retxPacket)); + EXPECT_TRUE(stream->lossBufMetas.empty()); + retxBufMetaIter = stream->retransmissionBufMetas.find(bufMetaStartingOffset); + EXPECT_NE(retxBufMetaIter, stream->retransmissionBufMetas.end()); + EXPECT_EQ(bufMetaStartingOffset, retxBufMetaIter->second.offset); + EXPECT_EQ(1000, retxBufMetaIter->second.length); + EXPECT_FALSE(retxBufMetaIter->second.eof); +} + } // namespace test } // namespace quic