diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index a4e3c61bb..550cc50ad 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -481,8 +481,10 @@ bool StreamFrameScheduler::writeStreamFrame( uint64_t flowControlLen = std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes); uint64_t bufferLen = stream.writeBuffer.chainLength(); - bool canWriteFin = - stream.finalWriteOffset.has_value() && bufferLen <= flowControlLen; + // We can't write FIN directly from here if writeBufMeta has pending bytes to + // send. + bool canWriteFin = stream.finalWriteOffset.has_value() && + bufferLen <= flowControlLen && stream.writeBufMeta.length == 0; auto dataLen = writeStreamFrameHeader( builder, stream.id, diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index 0ca81b2c7..c62e61f73 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -1739,6 +1741,39 @@ TEST_F(QuicPacketSchedulerTest, RunOutFlowControlDuringStreamWrite) { EXPECT_EQ(200, stream2->retransmissionBuffer[0]->data.chainLength()); } +TEST_F(QuicPacketSchedulerTest, NoFinWhenThereIsPendingWriteBuf) { + QuicServerConnectionState conn( + FizzServerQuicHandshakeContext::Builder().build()); + conn.streamManager->setMaxLocalBidirectionalStreams(10); + conn.flowControlState.peerAdvertisedMaxOffset = 100000; + auto* stream = *(conn.streamManager->createNextBidirectionalStream()); + stream->flowControlState.peerAdvertisedMaxOffset = 100000; + + writeDataToQuicStream(*stream, folly::IOBuf::copyBuffer("Ascent"), false); + stream->dsrSender = std::make_unique(); + BufferMeta bufferMeta(5000); + writeBufMetaToQuicStream(*stream, bufferMeta, true); + EXPECT_TRUE(stream->finalWriteOffset.hasValue()); + PacketNum packetNum = 0; + ShortHeader header( + ProtectionType::KeyPhaseOne, + conn.clientConnectionId.value_or(getTestConnectionId()), + packetNum); + RegularQuicPacketBuilder builder( + conn.udpSendPacketLen, + std::move(header), + conn.ackStates.appDataAckState.largestAckedByPeer.value_or(0)); + builder.encodePacketHeader(); + StreamFrameScheduler scheduler(conn); + scheduler.writeStreams(builder); + auto packet = std::move(builder).buildPacket().packet; + EXPECT_EQ(1, packet.frames.size()); + auto streamFrame = *packet.frames[0].asWriteStreamFrame(); + EXPECT_EQ(streamFrame.len, 6); + EXPECT_EQ(streamFrame.offset, 0); + EXPECT_FALSE(streamFrame.fin); +} + INSTANTIATE_TEST_CASE_P( QuicPacketSchedulerTests, QuicPacketSchedulerTest, diff --git a/quic/dsr/Scheduler.cpp b/quic/dsr/Scheduler.cpp index 5d2799a1f..3652c41d2 100644 --- a/quic/dsr/Scheduler.cpp +++ b/quic/dsr/Scheduler.cpp @@ -71,12 +71,21 @@ DSRStreamFrameScheduler::SchedulingResult DSRStreamFrameScheduler::writeStream( if (!hasFreshBufMeta || builder.remainingSpace() == 0) { return result; } + // If we have fresh BufMeta to write, the offset cannot be 0. This is based on + // the current limit that some real data has to be written into the stream + // before BufMetas. + CHECK_NE(stream->writeBufMeta.offset, 0); uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_); if (connWritableBytes == 0) { return result; } - auto flowControlLen = - std::min(getSendStreamFlowControlBytesWire(*stream), connWritableBytes); + // When stream still has writeBuffer, getSendStreamFlowControlBytesWire counts + // from currentWriteOffset which isn't right for BufMetas. + auto streamFlowControlLen = std::min( + getSendStreamFlowControlBytesWire(*stream), + stream->flowControlState.peerAdvertisedMaxOffset - + stream->writeBufMeta.offset); + auto flowControlLen = std::min(streamFlowControlLen, connWritableBytes); bool canWriteFin = stream->finalWriteOffset.has_value() && stream->writeBufMeta.length <= flowControlLen; SendInstruction::Builder instructionBuilder(conn_, *streamId); diff --git a/quic/dsr/test/SchedulerTest.cpp b/quic/dsr/test/SchedulerTest.cpp index 3fc1b71e2..8d624dd3b 100644 --- a/quic/dsr/test/SchedulerTest.cpp +++ b/quic/dsr/test/SchedulerTest.cpp @@ -32,6 +32,7 @@ TEST_F(SchedulerTest, ScheduleStream) { DSRStreamFrameScheduler scheduler(conn_); EXPECT_FALSE(scheduler.hasPendingData()); auto stream = *conn_.streamManager->createNextBidirectionalStream(); + stream->flowControlState.peerAdvertisedMaxOffset = 200; stream->dsrSender = std::make_unique(); writeDataToQuicStream( *stream, folly::IOBuf::copyBuffer("New York Bagles"), false); @@ -43,22 +44,51 @@ TEST_F(SchedulerTest, ScheduleStream) { conn_.streamManager->hasDSRWritable()); EXPECT_TRUE(scheduler.hasPendingData()); EXPECT_CALL(builder_, remainingSpaceNonConst()).WillRepeatedly(Return(1000)); + uint64_t writtenLength = 0; EXPECT_CALL(builder_, addSendInstruction(_, _)) .WillOnce(Invoke([&](SendInstruction&& instruction, uint32_t) { EXPECT_EQ(stream->id, (size_t)instruction.streamId); EXPECT_EQ(expectedBufMetaOffset, instruction.offset); - EXPECT_EQ(200, instruction.len); - EXPECT_TRUE(instruction.fin); + EXPECT_GT(200, instruction.len); + writtenLength = instruction.len; + EXPECT_FALSE(instruction.fin); })); EXPECT_TRUE(scheduler.writeStream(builder_).writeSuccess); - auto writtenMeta = stream->writeBufMeta.split(200); - EXPECT_EQ(0, stream->writeBufMeta.length); - ++stream->writeBufMeta.offset; + auto writtenMeta = stream->writeBufMeta.split(writtenLength); + auto nextExpectedOffset = stream->writeBufMeta.offset; + EXPECT_GT(stream->writeBufMeta.length, 0); stream->retransmissionBufMetas.emplace( std::piecewise_construct, std::forward_as_tuple(expectedBufMetaOffset), std::forward_as_tuple(writtenMeta)); + // This is now flow control blocked: + EXPECT_FALSE(stream->hasWritableBufMeta()); + conn_.streamManager->updateWritableStreams(*stream); + EXPECT_FALSE(conn_.streamManager->hasDSRWritable()); + EXPECT_TRUE(conn_.streamManager->writableDSRStreams().empty()); + + stream->flowControlState.peerAdvertisedMaxOffset = 500; + conn_.streamManager->updateWritableStreams(*stream); + EXPECT_TRUE(conn_.streamManager->hasDSRWritable()); + EXPECT_FALSE(conn_.streamManager->writableDSRStreams().empty()); + EXPECT_CALL(builder_, addSendInstruction(_, _)) + .WillOnce(Invoke([&](SendInstruction&& instruction, uint32_t) { + EXPECT_EQ(stream->id, (size_t)instruction.streamId); + EXPECT_EQ(nextExpectedOffset, instruction.offset); + EXPECT_GT(instruction.len, 0); + writtenLength = instruction.len; + EXPECT_TRUE(instruction.fin); + })); + EXPECT_TRUE(scheduler.writeStream(builder_).writeSuccess); + + auto nextWrittenMeta = stream->writeBufMeta.split(writtenLength); + EXPECT_EQ(stream->writeBufMeta.length, 0); + stream->writeBufMeta.offset++; + stream->retransmissionBufMetas.emplace( + std::piecewise_construct, + std::forward_as_tuple(nextExpectedOffset), + std::forward_as_tuple(nextWrittenMeta)); EXPECT_FALSE(stream->hasWritableBufMeta()); conn_.streamManager->updateWritableStreams(*stream); EXPECT_FALSE(conn_.streamManager->hasDSRWritable()); diff --git a/quic/flowcontrol/QuicFlowController.cpp b/quic/flowcontrol/QuicFlowController.cpp index 093b3eb62..1c77cfd4a 100644 --- a/quic/flowcontrol/QuicFlowController.cpp +++ b/quic/flowcontrol/QuicFlowController.cpp @@ -217,7 +217,7 @@ void updateFlowControlOnResetStream(QuicStreamState& stream) { void maybeWriteBlockAfterAPIWrite(QuicStreamState& stream) { // Only write blocked when stream becomes blocked if (getSendStreamFlowControlBytesWire(stream) == 0 && - stream.writeBuffer.empty()) { + stream.writeBuffer.empty() && stream.writeBufMeta.length == 0) { stream.conn.streamManager->queueBlocked( stream.id, stream.flowControlState.peerAdvertisedMaxOffset); if (stream.conn.qLogger) { @@ -244,12 +244,14 @@ void maybeWriteDataBlockedAfterSocketWrite(QuicConnectionStateBase& conn) { void maybeWriteBlockAfterSocketWrite(QuicStreamState& stream) { // Only write blocked when the flow control bytes are used up and there are // still pending data - if (stream.finalWriteOffset && - *stream.finalWriteOffset < stream.currentWriteOffset) { + if (stream.streamWriteError) { + return; + } + if (stream.finalWriteOffset && stream.hasSentFIN()) { return; } if (getSendStreamFlowControlBytesWire(stream) == 0 && - !stream.writeBuffer.empty()) { + (!stream.writeBuffer.empty() || stream.writeBufMeta.length > 0)) { stream.conn.streamManager->queueBlocked( stream.id, stream.flowControlState.peerAdvertisedMaxOffset); if (stream.conn.qLogger) { @@ -273,7 +275,8 @@ void handleStreamWindowUpdate( if (stream.flowControlState.peerAdvertisedMaxOffset <= maximumData) { stream.flowControlState.peerAdvertisedMaxOffset = maximumData; if (stream.flowControlState.peerAdvertisedMaxOffset > - stream.currentWriteOffset + stream.writeBuffer.chainLength()) { + stream.currentWriteOffset + stream.writeBuffer.chainLength() + + stream.writeBufMeta.length) { updateFlowControlList(stream); } stream.conn.streamManager->updateWritableStreams(stream); @@ -323,14 +326,15 @@ void handleStreamBlocked(QuicStreamState& stream) { uint64_t getSendStreamFlowControlBytesWire(const QuicStreamState& stream) { DCHECK_GE( stream.flowControlState.peerAdvertisedMaxOffset, - stream.currentWriteOffset); + stream.nextOffsetToWrite()); return stream.flowControlState.peerAdvertisedMaxOffset - - stream.currentWriteOffset; + stream.nextOffsetToWrite(); } uint64_t getSendStreamFlowControlBytesAPI(const QuicStreamState& stream) { auto sendFlowControlBytes = getSendStreamFlowControlBytesWire(stream); - auto dataInBuffer = stream.writeBuffer.chainLength(); + auto dataInBuffer = + stream.writeBuffer.chainLength() + stream.writeBufMeta.length; if (dataInBuffer > sendFlowControlBytes) { return 0; } else { diff --git a/quic/flowcontrol/test/QuicFlowControlTest.cpp b/quic/flowcontrol/test/QuicFlowControlTest.cpp index f305e9db0..85a04ca17 100644 --- a/quic/flowcontrol/test/QuicFlowControlTest.cpp +++ b/quic/flowcontrol/test/QuicFlowControlTest.cpp @@ -318,6 +318,15 @@ TEST_F(QuicFlowControlTest, MaybeWriteBlockedAfterSocketWrite) { EXPECT_CALL(*transportInfoCb_, onStreamFlowControlBlocked()).Times(1); maybeWriteBlockAfterSocketWrite(stream); EXPECT_TRUE(conn_.streamManager->hasBlocked()); + + // No block if everything till FIN has been sent + conn_.streamManager->removeBlocked(id); + EXPECT_FALSE(conn_.streamManager->hasBlocked()); + stream.finalWriteOffset = + stream.currentWriteOffset + stream.writeBuffer.chainLength(); + stream.currentWriteOffset = *stream.finalWriteOffset + 1; + maybeWriteBlockAfterSocketWrite(stream); + EXPECT_FALSE(conn_.streamManager->hasBlocked()); } TEST_F(QuicFlowControlTest, MaybeSendStreamWindowUpdateChangeWindowLarger) { @@ -826,5 +835,26 @@ TEST_F(QuicFlowControlTest, OnStreamWindowUpdateSentWithoutPendingEvent) { EXPECT_FALSE(conn_.streamManager->pendingWindowUpdate(id)); } +TEST_F(QuicFlowControlTest, StreamFlowControlWithBufMeta) { + StreamId id = 0; + QuicStreamState stream(id, conn_); + stream.flowControlState.peerAdvertisedMaxOffset = 1000; + stream.currentWriteOffset = 200; + stream.writeBuffer.append(buildRandomInputData(100)); + EXPECT_EQ(800, getSendStreamFlowControlBytesWire(stream)); + EXPECT_EQ(700, getSendStreamFlowControlBytesAPI(stream)); + + stream.writeBufMeta.offset = + stream.currentWriteOffset + stream.writeBuffer.chainLength(); + stream.writeBufMeta.length = 300; + EXPECT_EQ(800, getSendStreamFlowControlBytesWire(stream)); + EXPECT_EQ(400, getSendStreamFlowControlBytesAPI(stream)); + + stream.currentWriteOffset += stream.writeBuffer.chainLength(); + stream.writeBuffer.move(); + EXPECT_EQ(700, getSendStreamFlowControlBytesWire(stream)); + EXPECT_EQ(400, getSendStreamFlowControlBytesAPI(stream)); +} + } // namespace test } // namespace quic diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index d2b3765e1..18f9154c2 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -134,8 +134,11 @@ struct QuicStreamLike { // Current offset of the start bytes in the write buffer. // This changes when we pop stuff off the writeBuffer. - // When we are finished writing out all the bytes until FIN, this will - // be one greater than finalWriteOffset. + // In a non-DSR stream, when we are finished writing out all the bytes until + // FIN, this will be one greater than finalWriteOffset. + // When DSR is used, this still points to the starting bytes in the write + // buffer. Its value won't change with WriteBufferMetas are appended and sent + // for a stream. uint64_t currentWriteOffset{0}; // the minimum offset requires retransmit @@ -329,6 +332,28 @@ struct QuicStreamState : public QuicStreamLike { return false; } + FOLLY_NODISCARD bool hasSentFIN() const { + if (!finalWriteOffset) { + return false; + } + return currentWriteOffset > *finalWriteOffset || + writeBufMeta.offset > *finalWriteOffset; + } + + FOLLY_NODISCARD uint64_t nextOffsetToWrite() const { + // The stream has never had WriteBufferMetas. Then currentWriteOffset + // always points to the next offset we send. This of course relies on the + // current contract of DSR: Real data always comes first. This code (and a + // lot other code) breaks when that contract is breached. + if (writeBufMeta.offset == 0) { + return currentWriteOffset; + } + if (!writeBuffer.empty()) { + return currentWriteOffset; + } + return writeBufMeta.offset; + } + bool hasReadableData() const { return (readBuffer.size() > 0 && currentReadOffset == readBuffer.front().offset) ||