1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-07-30 14:43:05 +03:00

Codec writing/reading of stream group ids

Summary: Add writing and reading of stream group ids

Reviewed By: mjoras

Differential Revision: D36415929

fbshipit-source-id: 650bb8d6f81b2014741a517b165b4d27b7b6c0fe
This commit is contained in:
Konstantin Tsoy
2022-06-03 15:47:17 -07:00
committed by Facebook GitHub Bot
parent 3445442000
commit 4faaa83642
12 changed files with 200 additions and 19 deletions

View File

@ -193,6 +193,15 @@ enum class FrameType : uint64_t {
DATAGRAM_LEN = 0x31, DATAGRAM_LEN = 0x31,
KNOB = 0x1550, KNOB = 0x1550,
ACK_FREQUENCY = 0xAF, ACK_FREQUENCY = 0xAF,
// Stream groups.
GROUP_STREAM = 0x32,
GROUP_STREAM_FIN = 0x33,
GROUP_STREAM_LEN = 0x34,
GROUP_STREAM_LEN_FIN = 0x35,
GROUP_STREAM_OFF = 0x36,
GROUP_STREAM_OFF_FIN = 0x37,
GROUP_STREAM_OFF_LEN = 0x38,
GROUP_STREAM_OFF_LEN_FIN = 0x39,
}; };
inline constexpr uint16_t toFrameError(FrameType frame) { inline constexpr uint16_t toFrameError(FrameType frame) {

View File

@ -358,7 +358,8 @@ bool StreamFrameScheduler::writeStreamLossBuffers(
bufferLen, // writeBufferLen -- only the len of the single buffer. bufferLen, // writeBufferLen -- only the len of the single buffer.
bufferLen, // flowControlLen -- not relevant, already flow controlled. bufferLen, // flowControlLen -- not relevant, already flow controlled.
buffer->eof, buffer->eof,
folly::none /* skipLenHint */); folly::none /* skipLenHint */,
stream.groupId);
if (dataLen) { if (dataLen) {
wroteStreamFrame = true; wroteStreamFrame = true;
writeStreamFrameData(builder, buffer->data, *dataLen); writeStreamFrameData(builder, buffer->data, *dataLen);
@ -531,7 +532,8 @@ bool StreamFrameScheduler::writeStreamFrame(
bufferLen, bufferLen,
flowControlLen, flowControlLen,
canWriteFin, canWriteFin,
folly::none /* skipLenHint */); folly::none /* skipLenHint */,
stream.groupId);
if (!dataLen) { if (!dataLen) {
return false; return false;
} }

View File

@ -364,15 +364,32 @@ ReadNewTokenFrame decodeNewTokenFrame(folly::io::Cursor& cursor) {
ReadStreamFrame decodeStreamFrame( ReadStreamFrame decodeStreamFrame(
BufQueue& queue, BufQueue& queue,
StreamTypeField frameTypeField) { StreamTypeField frameTypeField,
bool isGroupFrame) {
const quic::FrameType frameType =
isGroupFrame ? quic::FrameType::GROUP_STREAM : quic::FrameType::STREAM;
folly::io::Cursor cursor(queue.front()); folly::io::Cursor cursor(queue.front());
auto streamId = decodeQuicInteger(cursor); auto streamId = decodeQuicInteger(cursor);
if (!streamId) { if (!streamId) {
throw QuicTransportException( throw QuicTransportException(
"Invalid stream id", "Invalid stream id",
quic::TransportErrorCode::FRAME_ENCODING_ERROR, quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::STREAM); frameType);
} }
folly::Optional<StreamGroupId> groupId;
if (isGroupFrame) {
auto gId = decodeQuicInteger(cursor);
if (!gId) {
throw QuicTransportException(
"Invalid group stream id",
quic::TransportErrorCode::FRAME_ENCODING_ERROR,
frameType);
}
groupId = gId->first;
}
uint64_t offset = 0; uint64_t offset = 0;
if (frameTypeField.hasOffset()) { if (frameTypeField.hasOffset()) {
auto optionalOffset = decodeQuicInteger(cursor); auto optionalOffset = decodeQuicInteger(cursor);
@ -380,7 +397,7 @@ ReadStreamFrame decodeStreamFrame(
throw QuicTransportException( throw QuicTransportException(
"Invalid offset", "Invalid offset",
quic::TransportErrorCode::FRAME_ENCODING_ERROR, quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::STREAM); frameType);
} }
offset = optionalOffset->first; offset = optionalOffset->first;
} }
@ -392,7 +409,7 @@ ReadStreamFrame decodeStreamFrame(
throw QuicTransportException( throw QuicTransportException(
"Invalid length", "Invalid length",
quic::TransportErrorCode::FRAME_ENCODING_ERROR, quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::STREAM); frameType);
} }
} }
Buf data; Buf data;
@ -401,7 +418,7 @@ ReadStreamFrame decodeStreamFrame(
throw QuicTransportException( throw QuicTransportException(
"Length mismatch", "Length mismatch",
quic::TransportErrorCode::FRAME_ENCODING_ERROR, quic::TransportErrorCode::FRAME_ENCODING_ERROR,
quic::FrameType::STREAM); frameType);
} }
// If dataLength > data's actual length then the cursor will throw. // If dataLength > data's actual length then the cursor will throw.
queue.trimStart(cursor - queue.front()); queue.trimStart(cursor - queue.front());
@ -413,7 +430,11 @@ ReadStreamFrame decodeStreamFrame(
data = queue.move(); data = queue.move();
} }
return ReadStreamFrame( return ReadStreamFrame(
folly::to<StreamId>(streamId->first), offset, std::move(data), fin); folly::to<StreamId>(streamId->first),
offset,
std::move(data),
fin,
groupId);
} }
MaxDataFrame decodeMaxDataFrame(folly::io::Cursor& cursor) { MaxDataFrame decodeMaxDataFrame(folly::io::Cursor& cursor) {
@ -756,8 +777,23 @@ QuicFrame parseFrame(
case FrameType::STREAM_OFF_LEN: case FrameType::STREAM_OFF_LEN:
case FrameType::STREAM_OFF_LEN_FIN: case FrameType::STREAM_OFF_LEN_FIN:
consumedQueue = true; consumedQueue = true;
return QuicFrame( return QuicFrame(decodeStreamFrame(
decodeStreamFrame(queue, StreamTypeField(frameTypeInt->first))); queue,
StreamTypeField(frameTypeInt->first),
false /* isGroupFrame */));
case FrameType::GROUP_STREAM:
case FrameType::GROUP_STREAM_FIN:
case FrameType::GROUP_STREAM_LEN:
case FrameType::GROUP_STREAM_LEN_FIN:
case FrameType::GROUP_STREAM_OFF:
case FrameType::GROUP_STREAM_OFF_FIN:
case FrameType::GROUP_STREAM_OFF_LEN:
case FrameType::GROUP_STREAM_OFF_LEN_FIN:
consumedQueue = true;
return QuicFrame(decodeStreamFrame(
queue,
StreamTypeField(frameTypeInt->first),
true /* isGroupFrame */));
case FrameType::MAX_DATA: case FrameType::MAX_DATA:
return QuicFrame(decodeMaxDataFrame(cursor)); return QuicFrame(decodeMaxDataFrame(cursor));
case FrameType::MAX_STREAM_DATA: case FrameType::MAX_STREAM_DATA:

View File

@ -124,7 +124,8 @@ ReadAckFrame decodeAckFrameWithECN(
ReadStreamFrame decodeStreamFrame( ReadStreamFrame decodeStreamFrame(
BufQueue& queue, BufQueue& queue,
StreamTypeField frameTypeField); StreamTypeField frameTypeField,
bool isGroupFrame = false);
ReadCryptoFrame decodeCryptoFrame(folly::io::Cursor& cursor); ReadCryptoFrame decodeCryptoFrame(folly::io::Cursor& cursor);

View File

@ -97,7 +97,8 @@ folly::Optional<PacketEvent> PacketRebuilder::rebuildFromPacket(
// It's safe to skip the length if it was the last frame in the // It's safe to skip the length if it was the last frame in the
// original packet and there's no ACK frame. Since we put the ACK // original packet and there's no ACK frame. Since we put the ACK
// frame last we need to end the stream frame in that case. // frame last we need to end the stream frame in that case.
lastFrame && bufferLen && !hasAckFrame); lastFrame && bufferLen && !hasAckFrame,
streamFrame.streamGroupId);
bool ret = dataLen.has_value() && *dataLen == streamFrame.len; bool ret = dataLen.has_value() && *dataLen == streamFrame.len;
if (ret) { if (ret) {
// Writing 0 byte for stream data is legit if the stream frame has // Writing 0 byte for stream data is legit if the stream frame has

View File

@ -35,7 +35,8 @@ folly::Optional<uint64_t> writeStreamFrameHeader(
uint64_t writeBufferLen, uint64_t writeBufferLen,
uint64_t flowControlLen, uint64_t flowControlLen,
bool fin, bool fin,
folly::Optional<bool> skipLenHint) { folly::Optional<bool> skipLenHint,
folly::Optional<StreamGroupId> streamGroupId) {
if (builder.remainingSpaceInPkt() == 0) { if (builder.remainingSpaceInPkt() == 0) {
return folly::none; return folly::none;
} }
@ -45,10 +46,21 @@ folly::Optional<uint64_t> writeStreamFrameHeader(
LocalErrorCode::INTERNAL_ERROR); LocalErrorCode::INTERNAL_ERROR);
} }
StreamTypeField::Builder streamTypeBuilder; StreamTypeField::Builder streamTypeBuilder;
if (streamGroupId) {
streamTypeBuilder.switchToStreamGroups();
}
QuicInteger idInt(id); QuicInteger idInt(id);
// First account for the things that are non-optional: frame type and stream folly::Optional<QuicInteger> groupIdInt;
// id. if (streamGroupId) {
groupIdInt = QuicInteger(*streamGroupId);
}
// First account for the things that are non-optional: frame type, stream id
// and (optional) group id.
uint64_t headerSize = sizeof(uint8_t) + idInt.getSize(); uint64_t headerSize = sizeof(uint8_t) + idInt.getSize();
if (groupIdInt) {
headerSize += groupIdInt->getSize();
}
if (builder.remainingSpaceInPkt() < headerSize) { if (builder.remainingSpaceInPkt() < headerSize) {
VLOG(4) << "No space in packet for stream header. stream=" << id VLOG(4) << "No space in packet for stream header. stream=" << id
<< " remaining=" << builder.remainingSpaceInPkt(); << " remaining=" << builder.remainingSpaceInPkt();
@ -131,14 +143,22 @@ folly::Optional<uint64_t> writeStreamFrameHeader(
auto streamType = streamTypeBuilder.build(); auto streamType = streamTypeBuilder.build();
builder.writeBE(streamType.fieldValue()); builder.writeBE(streamType.fieldValue());
builder.write(idInt); builder.write(idInt);
if (groupIdInt) {
builder.write(*groupIdInt);
}
if (offset != 0) { if (offset != 0) {
builder.write(offsetInt); builder.write(offsetInt);
} }
if (dataLenLen > 0) { if (dataLenLen > 0) {
builder.write(QuicInteger(dataLen)); builder.write(QuicInteger(dataLen));
} }
builder.appendFrame( builder.appendFrame(WriteStreamFrame(
WriteStreamFrame(id, offset, dataLen, streamType.hasFin())); id,
offset,
dataLen,
streamType.hasFin(),
false /* fromBufMetaIn */,
streamGroupId));
DCHECK(dataLen <= builder.remainingSpaceInPkt()); DCHECK(dataLen <= builder.remainingSpaceInPkt());
return folly::make_optional(dataLen); return folly::make_optional(dataLen);
} }

View File

@ -77,7 +77,8 @@ folly::Optional<uint64_t> writeStreamFrameHeader(
uint64_t writeBufferLen, uint64_t writeBufferLen,
uint64_t flowControlLen, uint64_t flowControlLen,
bool fin, bool fin,
folly::Optional<bool> skipLenHint); folly::Optional<bool> skipLenHint,
folly::Optional<StreamGroupId> streamGroupId = folly::none);
/** /**
* Write stream frama data into builder * Write stream frama data into builder

View File

@ -294,6 +294,11 @@ uint8_t StreamTypeField::fieldValue() const {
return field_; return field_;
} }
StreamTypeField::Builder& StreamTypeField::Builder::switchToStreamGroups() {
field_ = static_cast<uint8_t>(FrameType::GROUP_STREAM);
return *this;
}
StreamTypeField::Builder& StreamTypeField::Builder::setFin() { StreamTypeField::Builder& StreamTypeField::Builder::setFin() {
field_ |= StreamTypeField::kFinBit; field_ |= StreamTypeField::kFinBit;
return *this; return *this;
@ -430,6 +435,15 @@ std::string toString(FrameType frame) {
return "KNOB"; return "KNOB";
case FrameType::ACK_FREQUENCY: case FrameType::ACK_FREQUENCY:
return "ACK_FREQUENCY"; return "ACK_FREQUENCY";
case FrameType::GROUP_STREAM:
case FrameType::GROUP_STREAM_FIN:
case FrameType::GROUP_STREAM_LEN:
case FrameType::GROUP_STREAM_LEN_FIN:
case FrameType::GROUP_STREAM_OFF:
case FrameType::GROUP_STREAM_OFF_FIN:
case FrameType::GROUP_STREAM_OFF_LEN:
case FrameType::GROUP_STREAM_OFF_LEN_FIN:
return "GROUP_STREAM";
} }
LOG(WARNING) << "toString has unhandled frame type"; LOG(WARNING) << "toString has unhandled frame type";
return "UNKNOWN"; return "UNKNOWN";

View File

@ -1003,6 +1003,7 @@ struct StreamTypeField {
struct Builder { struct Builder {
public: public:
Builder() : field_(static_cast<uint8_t>(FrameType::STREAM)) {} Builder() : field_(static_cast<uint8_t>(FrameType::STREAM)) {}
Builder& switchToStreamGroups();
Builder& setFin(); Builder& setFin();
Builder& setOffset(); Builder& setOffset();
Builder& setLength(); Builder& setLength();

View File

@ -85,7 +85,8 @@ std::unique_ptr<folly::IOBuf> createStreamFrame(
folly::Optional<QuicInteger> offset = folly::none, folly::Optional<QuicInteger> offset = folly::none,
folly::Optional<QuicInteger> dataLength = folly::none, folly::Optional<QuicInteger> dataLength = folly::none,
Buf data = nullptr, Buf data = nullptr,
bool useRealValuesForStreamId = false) { bool useRealValuesForStreamId = false,
folly::Optional<QuicInteger> groupId = folly::none) {
std::unique_ptr<folly::IOBuf> streamFrame = folly::IOBuf::create(0); std::unique_ptr<folly::IOBuf> streamFrame = folly::IOBuf::create(0);
BufAppender wcursor(streamFrame.get(), 10); BufAppender wcursor(streamFrame.get(), 10);
auto appenderOp = [&](auto val) { wcursor.writeBE(val); }; auto appenderOp = [&](auto val) { wcursor.writeBE(val); };
@ -96,6 +97,9 @@ std::unique_ptr<folly::IOBuf> createStreamFrame(
streamId->encode(appenderOp); streamId->encode(appenderOp);
} }
} }
if (groupId) {
groupId->encode(appenderOp);
}
if (offset) { if (offset) {
offset->encode(appenderOp); offset->encode(appenderOp);
} }
@ -776,5 +780,35 @@ TEST_F(DecodeTest, ParsePlaintextRetryToken) {
EXPECT_EQ(parseResult.value(), timestampInMs); EXPECT_EQ(parseResult.value(), timestampInMs);
} }
TEST_F(DecodeTest, StreamGroupDecodeSuccess) {
QuicInteger streamId(10);
QuicInteger groupId(20);
QuicInteger offset(10);
QuicInteger length(1);
auto streamType = StreamTypeField::Builder()
.switchToStreamGroups()
.setFin()
.setOffset()
.setLength()
.build();
auto streamFrame = createStreamFrame(
streamId,
offset,
length,
folly::IOBuf::copyBuffer("a"),
false /* useRealValuesForStreamId */,
groupId);
BufQueue queue;
queue.append(streamFrame->clone());
auto decodedFrame =
decodeStreamFrame(queue, streamType, true /* isGroupFrame */);
EXPECT_EQ(decodedFrame.offset, 10);
EXPECT_EQ(decodedFrame.data->computeChainDataLength(), 1);
EXPECT_EQ(decodedFrame.streamId, 10);
EXPECT_EQ(*decodedFrame.streamGroupId, 20);
EXPECT_TRUE(decodedFrame.fin);
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@ -1624,5 +1624,58 @@ TEST_F(QuicWriteCodecTest, WritePathResponse) {
EXPECT_EQ(wirePathResponseFrame.pathData, pathData); EXPECT_EQ(wirePathResponseFrame.pathData, pathData);
EXPECT_EQ(queue.chainLength(), 0); EXPECT_EQ(queue.chainLength(), 0);
} }
TEST_F(QuicWriteCodecTest, WriteStreamFrameWithGroup) {
MockQuicPacketBuilder pktBuilder;
pktBuilder.remaining_ = 1300;
setupCommonExpects(pktBuilder);
auto inputBuf = buildRandomInputData(50);
StreamId streamId = 4;
StreamGroupId groupId = 64;
uint64_t offset = 0;
bool fin = true;
auto dataLen = writeStreamFrameHeader(
pktBuilder,
streamId,
offset,
50,
50,
fin,
folly::none /* skipLenHint */,
groupId);
ASSERT_TRUE(dataLen);
ASSERT_EQ(*dataLen, 50);
writeStreamFrameData(pktBuilder, inputBuf->clone(), 50);
auto outputBuf = pktBuilder.data_->clone();
EXPECT_EQ(outputBuf->computeChainDataLength(), 55);
auto builtOut = std::move(pktBuilder).buildTestPacket();
auto regularPacket = builtOut.first;
EXPECT_EQ(regularPacket.frames.size(), 1);
auto& resultFrame = *regularPacket.frames.back().asWriteStreamFrame();
EXPECT_EQ(resultFrame.streamId, streamId);
EXPECT_EQ(resultFrame.streamGroupId, groupId);
EXPECT_EQ(resultFrame.offset, offset);
EXPECT_EQ(resultFrame.len, 50);
// Verify the on wire bytes via decoder.
auto wireBuf = std::move(builtOut.second);
BufQueue queue;
queue.append(wireBuf->clone());
QuicFrame streamFrameDecoded = quic::parseFrame(
queue,
regularPacket.header,
CodecParameters(kDefaultAckDelayExponent, QuicVersion::MVFST));
auto& decodedStreamFrame = *streamFrameDecoded.asReadStreamFrame();
EXPECT_EQ(decodedStreamFrame.streamId, streamId);
EXPECT_EQ(decodedStreamFrame.streamGroupId, groupId);
EXPECT_EQ(decodedStreamFrame.offset, offset);
EXPECT_EQ(decodedStreamFrame.data->computeChainDataLength(), 50);
EXPECT_TRUE(folly::IOBufEqualTo()(inputBuf, decodedStreamFrame.data));
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@ -79,6 +79,15 @@ folly::StringPiece toQlogString(FrameType frame) {
return "knob"; return "knob";
case FrameType::ACK_FREQUENCY: case FrameType::ACK_FREQUENCY:
return "ack_frequency"; return "ack_frequency";
case FrameType::GROUP_STREAM:
case FrameType::GROUP_STREAM_FIN:
case FrameType::GROUP_STREAM_LEN:
case FrameType::GROUP_STREAM_LEN_FIN:
case FrameType::GROUP_STREAM_OFF:
case FrameType::GROUP_STREAM_OFF_FIN:
case FrameType::GROUP_STREAM_OFF_LEN:
case FrameType::GROUP_STREAM_OFF_LEN_FIN:
return "group_stream";
} }
folly::assume_unreachable(); folly::assume_unreachable();
} }