From 4faaa83642be61de83a5712fdcc8e5a607f6a783 Mon Sep 17 00:00:00 2001 From: Konstantin Tsoy Date: Fri, 3 Jun 2022 15:47:17 -0700 Subject: [PATCH] Codec writing/reading of stream group ids Summary: Add writing and reading of stream group ids Reviewed By: mjoras Differential Revision: D36415929 fbshipit-source-id: 650bb8d6f81b2014741a517b165b4d27b7b6c0fe --- quic/QuicConstants.h | 9 +++++ quic/api/QuicPacketScheduler.cpp | 6 ++- quic/codec/Decode.cpp | 52 +++++++++++++++++++++---- quic/codec/Decode.h | 3 +- quic/codec/QuicPacketRebuilder.cpp | 3 +- quic/codec/QuicWriteCodec.cpp | 30 ++++++++++++--- quic/codec/QuicWriteCodec.h | 3 +- quic/codec/Types.cpp | 14 +++++++ quic/codec/Types.h | 1 + quic/codec/test/DecodeTest.cpp | 36 ++++++++++++++++- quic/codec/test/QuicWriteCodecTest.cpp | 53 ++++++++++++++++++++++++++ quic/logging/QLoggerConstants.cpp | 9 +++++ 12 files changed, 200 insertions(+), 19 deletions(-) diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index 8ac5a330c..1d665528c 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -193,6 +193,15 @@ enum class FrameType : uint64_t { DATAGRAM_LEN = 0x31, KNOB = 0x1550, ACK_FREQUENCY = 0xAF, + // Stream groups. + GROUP_STREAM = 0x32, + GROUP_STREAM_FIN = 0x33, + GROUP_STREAM_LEN = 0x34, + GROUP_STREAM_LEN_FIN = 0x35, + GROUP_STREAM_OFF = 0x36, + GROUP_STREAM_OFF_FIN = 0x37, + GROUP_STREAM_OFF_LEN = 0x38, + GROUP_STREAM_OFF_LEN_FIN = 0x39, }; inline constexpr uint16_t toFrameError(FrameType frame) { diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index 42ba0a9f9..5d6425943 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -358,7 +358,8 @@ bool StreamFrameScheduler::writeStreamLossBuffers( bufferLen, // writeBufferLen -- only the len of the single buffer. bufferLen, // flowControlLen -- not relevant, already flow controlled. buffer->eof, - folly::none /* skipLenHint */); + folly::none /* skipLenHint */, + stream.groupId); if (dataLen) { wroteStreamFrame = true; writeStreamFrameData(builder, buffer->data, *dataLen); @@ -531,7 +532,8 @@ bool StreamFrameScheduler::writeStreamFrame( bufferLen, flowControlLen, canWriteFin, - folly::none /* skipLenHint */); + folly::none /* skipLenHint */, + stream.groupId); if (!dataLen) { return false; } diff --git a/quic/codec/Decode.cpp b/quic/codec/Decode.cpp index 247cef177..53510cf7b 100644 --- a/quic/codec/Decode.cpp +++ b/quic/codec/Decode.cpp @@ -364,15 +364,32 @@ ReadNewTokenFrame decodeNewTokenFrame(folly::io::Cursor& cursor) { ReadStreamFrame decodeStreamFrame( BufQueue& queue, - StreamTypeField frameTypeField) { + StreamTypeField frameTypeField, + bool isGroupFrame) { + const quic::FrameType frameType = + isGroupFrame ? quic::FrameType::GROUP_STREAM : quic::FrameType::STREAM; folly::io::Cursor cursor(queue.front()); + auto streamId = decodeQuicInteger(cursor); if (!streamId) { throw QuicTransportException( "Invalid stream id", quic::TransportErrorCode::FRAME_ENCODING_ERROR, - quic::FrameType::STREAM); + frameType); } + + folly::Optional groupId; + if (isGroupFrame) { + auto gId = decodeQuicInteger(cursor); + if (!gId) { + throw QuicTransportException( + "Invalid group stream id", + quic::TransportErrorCode::FRAME_ENCODING_ERROR, + frameType); + } + groupId = gId->first; + } + uint64_t offset = 0; if (frameTypeField.hasOffset()) { auto optionalOffset = decodeQuicInteger(cursor); @@ -380,7 +397,7 @@ ReadStreamFrame decodeStreamFrame( throw QuicTransportException( "Invalid offset", quic::TransportErrorCode::FRAME_ENCODING_ERROR, - quic::FrameType::STREAM); + frameType); } offset = optionalOffset->first; } @@ -392,7 +409,7 @@ ReadStreamFrame decodeStreamFrame( throw QuicTransportException( "Invalid length", quic::TransportErrorCode::FRAME_ENCODING_ERROR, - quic::FrameType::STREAM); + frameType); } } Buf data; @@ -401,7 +418,7 @@ ReadStreamFrame decodeStreamFrame( throw QuicTransportException( "Length mismatch", quic::TransportErrorCode::FRAME_ENCODING_ERROR, - quic::FrameType::STREAM); + frameType); } // If dataLength > data's actual length then the cursor will throw. queue.trimStart(cursor - queue.front()); @@ -413,7 +430,11 @@ ReadStreamFrame decodeStreamFrame( data = queue.move(); } return ReadStreamFrame( - folly::to(streamId->first), offset, std::move(data), fin); + folly::to(streamId->first), + offset, + std::move(data), + fin, + groupId); } MaxDataFrame decodeMaxDataFrame(folly::io::Cursor& cursor) { @@ -756,8 +777,23 @@ QuicFrame parseFrame( case FrameType::STREAM_OFF_LEN: case FrameType::STREAM_OFF_LEN_FIN: consumedQueue = true; - return QuicFrame( - decodeStreamFrame(queue, StreamTypeField(frameTypeInt->first))); + return QuicFrame(decodeStreamFrame( + queue, + StreamTypeField(frameTypeInt->first), + false /* isGroupFrame */)); + case FrameType::GROUP_STREAM: + case FrameType::GROUP_STREAM_FIN: + case FrameType::GROUP_STREAM_LEN: + case FrameType::GROUP_STREAM_LEN_FIN: + case FrameType::GROUP_STREAM_OFF: + case FrameType::GROUP_STREAM_OFF_FIN: + case FrameType::GROUP_STREAM_OFF_LEN: + case FrameType::GROUP_STREAM_OFF_LEN_FIN: + consumedQueue = true; + return QuicFrame(decodeStreamFrame( + queue, + StreamTypeField(frameTypeInt->first), + true /* isGroupFrame */)); case FrameType::MAX_DATA: return QuicFrame(decodeMaxDataFrame(cursor)); case FrameType::MAX_STREAM_DATA: diff --git a/quic/codec/Decode.h b/quic/codec/Decode.h index 158d5da62..abede7493 100644 --- a/quic/codec/Decode.h +++ b/quic/codec/Decode.h @@ -124,7 +124,8 @@ ReadAckFrame decodeAckFrameWithECN( ReadStreamFrame decodeStreamFrame( BufQueue& queue, - StreamTypeField frameTypeField); + StreamTypeField frameTypeField, + bool isGroupFrame = false); ReadCryptoFrame decodeCryptoFrame(folly::io::Cursor& cursor); diff --git a/quic/codec/QuicPacketRebuilder.cpp b/quic/codec/QuicPacketRebuilder.cpp index b458992bd..d6787fb00 100644 --- a/quic/codec/QuicPacketRebuilder.cpp +++ b/quic/codec/QuicPacketRebuilder.cpp @@ -97,7 +97,8 @@ folly::Optional PacketRebuilder::rebuildFromPacket( // It's safe to skip the length if it was the last frame in the // original packet and there's no ACK frame. Since we put the ACK // frame last we need to end the stream frame in that case. - lastFrame && bufferLen && !hasAckFrame); + lastFrame && bufferLen && !hasAckFrame, + streamFrame.streamGroupId); bool ret = dataLen.has_value() && *dataLen == streamFrame.len; if (ret) { // Writing 0 byte for stream data is legit if the stream frame has diff --git a/quic/codec/QuicWriteCodec.cpp b/quic/codec/QuicWriteCodec.cpp index b93a0c95d..69ce2e87b 100644 --- a/quic/codec/QuicWriteCodec.cpp +++ b/quic/codec/QuicWriteCodec.cpp @@ -35,7 +35,8 @@ folly::Optional writeStreamFrameHeader( uint64_t writeBufferLen, uint64_t flowControlLen, bool fin, - folly::Optional skipLenHint) { + folly::Optional skipLenHint, + folly::Optional streamGroupId) { if (builder.remainingSpaceInPkt() == 0) { return folly::none; } @@ -45,10 +46,21 @@ folly::Optional writeStreamFrameHeader( LocalErrorCode::INTERNAL_ERROR); } StreamTypeField::Builder streamTypeBuilder; + if (streamGroupId) { + streamTypeBuilder.switchToStreamGroups(); + } QuicInteger idInt(id); - // First account for the things that are non-optional: frame type and stream - // id. + folly::Optional groupIdInt; + if (streamGroupId) { + groupIdInt = QuicInteger(*streamGroupId); + } + + // First account for the things that are non-optional: frame type, stream id + // and (optional) group id. uint64_t headerSize = sizeof(uint8_t) + idInt.getSize(); + if (groupIdInt) { + headerSize += groupIdInt->getSize(); + } if (builder.remainingSpaceInPkt() < headerSize) { VLOG(4) << "No space in packet for stream header. stream=" << id << " remaining=" << builder.remainingSpaceInPkt(); @@ -131,14 +143,22 @@ folly::Optional writeStreamFrameHeader( auto streamType = streamTypeBuilder.build(); builder.writeBE(streamType.fieldValue()); builder.write(idInt); + if (groupIdInt) { + builder.write(*groupIdInt); + } if (offset != 0) { builder.write(offsetInt); } if (dataLenLen > 0) { builder.write(QuicInteger(dataLen)); } - builder.appendFrame( - WriteStreamFrame(id, offset, dataLen, streamType.hasFin())); + builder.appendFrame(WriteStreamFrame( + id, + offset, + dataLen, + streamType.hasFin(), + false /* fromBufMetaIn */, + streamGroupId)); DCHECK(dataLen <= builder.remainingSpaceInPkt()); return folly::make_optional(dataLen); } diff --git a/quic/codec/QuicWriteCodec.h b/quic/codec/QuicWriteCodec.h index 90e98d0aa..9e805a443 100644 --- a/quic/codec/QuicWriteCodec.h +++ b/quic/codec/QuicWriteCodec.h @@ -77,7 +77,8 @@ folly::Optional writeStreamFrameHeader( uint64_t writeBufferLen, uint64_t flowControlLen, bool fin, - folly::Optional skipLenHint); + folly::Optional skipLenHint, + folly::Optional streamGroupId = folly::none); /** * Write stream frama data into builder diff --git a/quic/codec/Types.cpp b/quic/codec/Types.cpp index 4bbcad040..96e78797b 100644 --- a/quic/codec/Types.cpp +++ b/quic/codec/Types.cpp @@ -294,6 +294,11 @@ uint8_t StreamTypeField::fieldValue() const { return field_; } +StreamTypeField::Builder& StreamTypeField::Builder::switchToStreamGroups() { + field_ = static_cast(FrameType::GROUP_STREAM); + return *this; +} + StreamTypeField::Builder& StreamTypeField::Builder::setFin() { field_ |= StreamTypeField::kFinBit; return *this; @@ -430,6 +435,15 @@ std::string toString(FrameType frame) { return "KNOB"; case FrameType::ACK_FREQUENCY: return "ACK_FREQUENCY"; + case FrameType::GROUP_STREAM: + case FrameType::GROUP_STREAM_FIN: + case FrameType::GROUP_STREAM_LEN: + case FrameType::GROUP_STREAM_LEN_FIN: + case FrameType::GROUP_STREAM_OFF: + case FrameType::GROUP_STREAM_OFF_FIN: + case FrameType::GROUP_STREAM_OFF_LEN: + case FrameType::GROUP_STREAM_OFF_LEN_FIN: + return "GROUP_STREAM"; } LOG(WARNING) << "toString has unhandled frame type"; return "UNKNOWN"; diff --git a/quic/codec/Types.h b/quic/codec/Types.h index e2d2a778a..0e173d00d 100644 --- a/quic/codec/Types.h +++ b/quic/codec/Types.h @@ -1003,6 +1003,7 @@ struct StreamTypeField { struct Builder { public: Builder() : field_(static_cast(FrameType::STREAM)) {} + Builder& switchToStreamGroups(); Builder& setFin(); Builder& setOffset(); Builder& setLength(); diff --git a/quic/codec/test/DecodeTest.cpp b/quic/codec/test/DecodeTest.cpp index 15404b017..4e839d189 100644 --- a/quic/codec/test/DecodeTest.cpp +++ b/quic/codec/test/DecodeTest.cpp @@ -85,7 +85,8 @@ std::unique_ptr createStreamFrame( folly::Optional offset = folly::none, folly::Optional dataLength = folly::none, Buf data = nullptr, - bool useRealValuesForStreamId = false) { + bool useRealValuesForStreamId = false, + folly::Optional groupId = folly::none) { std::unique_ptr streamFrame = folly::IOBuf::create(0); BufAppender wcursor(streamFrame.get(), 10); auto appenderOp = [&](auto val) { wcursor.writeBE(val); }; @@ -96,6 +97,9 @@ std::unique_ptr createStreamFrame( streamId->encode(appenderOp); } } + if (groupId) { + groupId->encode(appenderOp); + } if (offset) { offset->encode(appenderOp); } @@ -776,5 +780,35 @@ TEST_F(DecodeTest, ParsePlaintextRetryToken) { EXPECT_EQ(parseResult.value(), timestampInMs); } +TEST_F(DecodeTest, StreamGroupDecodeSuccess) { + QuicInteger streamId(10); + QuicInteger groupId(20); + QuicInteger offset(10); + QuicInteger length(1); + auto streamType = StreamTypeField::Builder() + .switchToStreamGroups() + .setFin() + .setOffset() + .setLength() + .build(); + + auto streamFrame = createStreamFrame( + streamId, + offset, + length, + folly::IOBuf::copyBuffer("a"), + false /* useRealValuesForStreamId */, + groupId); + BufQueue queue; + queue.append(streamFrame->clone()); + auto decodedFrame = + decodeStreamFrame(queue, streamType, true /* isGroupFrame */); + EXPECT_EQ(decodedFrame.offset, 10); + EXPECT_EQ(decodedFrame.data->computeChainDataLength(), 1); + EXPECT_EQ(decodedFrame.streamId, 10); + EXPECT_EQ(*decodedFrame.streamGroupId, 20); + EXPECT_TRUE(decodedFrame.fin); +} + } // namespace test } // namespace quic diff --git a/quic/codec/test/QuicWriteCodecTest.cpp b/quic/codec/test/QuicWriteCodecTest.cpp index 0f6dc64d8..f05a2974d 100644 --- a/quic/codec/test/QuicWriteCodecTest.cpp +++ b/quic/codec/test/QuicWriteCodecTest.cpp @@ -1624,5 +1624,58 @@ TEST_F(QuicWriteCodecTest, WritePathResponse) { EXPECT_EQ(wirePathResponseFrame.pathData, pathData); EXPECT_EQ(queue.chainLength(), 0); } + +TEST_F(QuicWriteCodecTest, WriteStreamFrameWithGroup) { + MockQuicPacketBuilder pktBuilder; + pktBuilder.remaining_ = 1300; + setupCommonExpects(pktBuilder); + auto inputBuf = buildRandomInputData(50); + + StreamId streamId = 4; + StreamGroupId groupId = 64; + uint64_t offset = 0; + bool fin = true; + + auto dataLen = writeStreamFrameHeader( + pktBuilder, + streamId, + offset, + 50, + 50, + fin, + folly::none /* skipLenHint */, + groupId); + ASSERT_TRUE(dataLen); + ASSERT_EQ(*dataLen, 50); + writeStreamFrameData(pktBuilder, inputBuf->clone(), 50); + + auto outputBuf = pktBuilder.data_->clone(); + EXPECT_EQ(outputBuf->computeChainDataLength(), 55); + + auto builtOut = std::move(pktBuilder).buildTestPacket(); + auto regularPacket = builtOut.first; + EXPECT_EQ(regularPacket.frames.size(), 1); + auto& resultFrame = *regularPacket.frames.back().asWriteStreamFrame(); + EXPECT_EQ(resultFrame.streamId, streamId); + EXPECT_EQ(resultFrame.streamGroupId, groupId); + EXPECT_EQ(resultFrame.offset, offset); + EXPECT_EQ(resultFrame.len, 50); + + // Verify the on wire bytes via decoder. + auto wireBuf = std::move(builtOut.second); + BufQueue queue; + queue.append(wireBuf->clone()); + QuicFrame streamFrameDecoded = quic::parseFrame( + queue, + regularPacket.header, + CodecParameters(kDefaultAckDelayExponent, QuicVersion::MVFST)); + auto& decodedStreamFrame = *streamFrameDecoded.asReadStreamFrame(); + EXPECT_EQ(decodedStreamFrame.streamId, streamId); + EXPECT_EQ(decodedStreamFrame.streamGroupId, groupId); + EXPECT_EQ(decodedStreamFrame.offset, offset); + EXPECT_EQ(decodedStreamFrame.data->computeChainDataLength(), 50); + EXPECT_TRUE(folly::IOBufEqualTo()(inputBuf, decodedStreamFrame.data)); +} + } // namespace test } // namespace quic diff --git a/quic/logging/QLoggerConstants.cpp b/quic/logging/QLoggerConstants.cpp index baf5cee7e..f518afb91 100644 --- a/quic/logging/QLoggerConstants.cpp +++ b/quic/logging/QLoggerConstants.cpp @@ -79,6 +79,15 @@ folly::StringPiece toQlogString(FrameType frame) { return "knob"; case FrameType::ACK_FREQUENCY: return "ack_frequency"; + case FrameType::GROUP_STREAM: + case FrameType::GROUP_STREAM_FIN: + case FrameType::GROUP_STREAM_LEN: + case FrameType::GROUP_STREAM_LEN_FIN: + case FrameType::GROUP_STREAM_OFF: + case FrameType::GROUP_STREAM_OFF_FIN: + case FrameType::GROUP_STREAM_OFF_LEN: + case FrameType::GROUP_STREAM_OFF_LEN_FIN: + return "group_stream"; } folly::assume_unreachable(); }