From adc1e15effd208d5d42266e5d45a4450afa61a9c Mon Sep 17 00:00:00 2001 From: Yang Chi Date: Wed, 3 Mar 2021 23:48:14 -0800 Subject: [PATCH] Write a buffer's meta data into QUIC Summary: Instead of writing real data into the transport, we want to support a use case where only its metadata is written to the transport. Sending of the real data is delegated to another entity in such setup. Reviewed By: mjoras Differential Revision: D26131772 fbshipit-source-id: 4fcfa3a1626203f63c61898e6de089a3079d043d --- quic/api/QuicSocket.h | 9 ++++ quic/api/QuicTransportBase.cpp | 73 +++++++++++++++++++++++++++ quic/api/QuicTransportBase.h | 6 +++ quic/api/test/MockQuicSocket.h | 3 ++ quic/api/test/QuicTransportTest.cpp | 26 ++++++++++ quic/state/QuicStreamFunctions.cpp | 25 ++++++++++ quic/state/QuicStreamFunctions.h | 11 +++++ quic/state/StreamData.h | 76 +++++++++++++++++++++++++++++ 8 files changed, 229 insertions(+) diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index eec851981..9e17ef82b 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -955,6 +955,15 @@ class QuicSocket { bool eof, DeliveryCallback* cb = nullptr) = 0; + /** + * Write a data representation in the form of BufferMeta to the given stream. + */ + virtual WriteResult writeBufMeta( + StreamId id, + const BufferMeta& data, + bool eof, + DeliveryCallback* cb = nullptr) = 0; + /** * Register a callback to be invoked when the peer has acknowledged the * given offset on the given stream. diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 8c460fb13..257bd30ca 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1968,6 +1968,79 @@ QuicSocket::WriteResult QuicTransportBase::writeChain( return folly::unit; } +QuicSocket::WriteResult QuicTransportBase::writeBufMeta( + StreamId id, + const BufferMeta& data, + bool eof, + DeliveryCallback* cb) { + if (isReceivingStream(conn_->nodeType, id)) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + FOLLY_MAYBE_UNUSED auto self = sharedGuard(); + try { + // Check whether stream exists before calling getStream to avoid + // creating a peer stream if it does not exist yet. + if (!conn_->streamManager->streamExists(id)) { + return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); + } + auto stream = conn_->streamManager->getStream(id); + if (!stream->writable()) { + return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); + } + if (stream->currentWriteOffset == 0 && stream->writeBuffer.empty()) { + // If nothing has been written to writeBuffer ever, meta writing isn't + // allowed. + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + // Register DeliveryCallback for the data + eof offset. + if (cb) { + auto dataLength = data.length + (eof ? 1 : 0); + if (dataLength) { + auto currentLargestWriteOffset = getLargestWriteOffsetSeen(*stream); + registerDeliveryCallback( + id, currentLargestWriteOffset + dataLength - 1, cb); + } + } + bool wasAppLimitedOrIdle = false; + if (conn_->congestionController) { + wasAppLimitedOrIdle = conn_->congestionController->isAppLimited(); + wasAppLimitedOrIdle |= conn_->streamManager->isAppIdle(); + } + writeBufMetaToQuicStream(*stream, data, eof); + // If we were previously app limited restart pacing with the current rate. + if (wasAppLimitedOrIdle && conn_->pacer) { + conn_->pacer->reset(); + } + updateWriteLooper(true); + } catch (const QuicTransportException& ex) { + VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " " + << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(std::make_pair( + QuicErrorCode(ex.errorCode()), std::string("writeChain() error"))); + return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR); + } catch (const QuicInternalException& ex) { + VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " " + << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(std::make_pair( + QuicErrorCode(ex.errorCode()), std::string("writeChain() error"))); + return folly::makeUnexpected(ex.errorCode()); + } catch (const std::exception& ex) { + VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " " + << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(std::make_pair( + QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), + std::string("writeChain() error"))); + return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR); + } + return folly::unit; +} + folly::Expected QuicTransportBase::registerDeliveryCallback( StreamId id, diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index a9fe709d6..5e8cb6ce4 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -178,6 +178,12 @@ class QuicTransportBase : public QuicSocket { bool eof, DeliveryCallback* cb = nullptr) override; + WriteResult writeBufMeta( + StreamId id, + const BufferMeta& data, + bool eof, + DeliveryCallback* cb = nullptr) override; + folly::Expected registerDeliveryCallback( StreamId id, uint64_t offset, diff --git a/quic/api/test/MockQuicSocket.h b/quic/api/test/MockQuicSocket.h index 6adc21fd1..e5328f38e 100644 --- a/quic/api/test/MockQuicSocket.h +++ b/quic/api/test/MockQuicSocket.h @@ -198,6 +198,9 @@ class MockQuicSocket : public QuicSocket { MOCK_METHOD4( writeChain, WriteResult(StreamId, SharedBuf, bool, DeliveryCallback*)); + MOCK_METHOD4( + writeBufMeta, + WriteResult(StreamId, const BufferMeta&, bool, DeliveryCallback*)); MOCK_METHOD3( registerDeliveryCallback, folly::Expected( diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index f14b912cb..a76be1504 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -3279,5 +3279,31 @@ TEST_F(QuicTransportTest, PrioritySetAndGet) { EXPECT_EQ(LocalErrorCode::CONNECTION_CLOSED, closedConnStreamPri.error()); } +TEST_F(QuicTransportTest, WriteBufMetaIntoStream) { + auto streamId = transport_->createBidirectionalStream().value(); + size_t bufferLength = 2000; + BufferMeta meta(bufferLength); + auto buf = buildRandomInputData(20); + // Some amount of real data needs to be written first: + transport_->writeChain(streamId, std::move(buf), false); + transport_->writeBufMeta(streamId, meta, true); + auto& stream = + *transport_->getConnectionState().streamManager->findStream(streamId); + EXPECT_GE(stream.writeBufMeta.offset, 20); + EXPECT_EQ(stream.writeBufMeta.length, bufferLength); + EXPECT_TRUE(stream.writeBufMeta.eof); + EXPECT_EQ( + *stream.finalWriteOffset, + stream.writeBufMeta.offset + stream.writeBufMeta.length); +} + +TEST_F(QuicTransportTest, WriteBufMetaWithoutRealData) { + auto streamId = transport_->createBidirectionalStream().value(); + size_t bufferLength = 2000; + BufferMeta meta(bufferLength); + auto result = transport_->writeBufMeta(streamId, meta, true); + EXPECT_TRUE(result.hasError()); +} + } // namespace test } // namespace quic diff --git a/quic/state/QuicStreamFunctions.cpp b/quic/state/QuicStreamFunctions.cpp index 566b3b6df..668ad2ea9 100644 --- a/quic/state/QuicStreamFunctions.cpp +++ b/quic/state/QuicStreamFunctions.cpp @@ -29,6 +29,9 @@ void prependToBuf(quic::Buf& buf, quic::Buf toAppend) { namespace quic { void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof) { + // Once data is written to writeBufMeta, no more data can be written to + // writeBuffer. + CHECK_EQ(0, stream.writeBufMeta.offset); uint64_t len = 0; if (data) { len = data->computeChainDataLength(); @@ -48,6 +51,28 @@ void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof) { stream.conn.streamManager->updateWritableStreams(stream); } +void writeBufMetaToQuicStream( + QuicStreamState& stream, + const BufferMeta& data, + bool eof) { + if (data.length > 0) { + maybeWriteBlockAfterAPIWrite(stream); + } + if (stream.writeBufMeta.offset == 0) { + CHECK(!stream.finalWriteOffset.has_value()); + stream.writeBufMeta.offset = + stream.currentWriteOffset + stream.writeBuffer.chainLength(); + } + stream.writeBufMeta.length += data.length; + if (eof) { + stream.finalWriteOffset = + stream.writeBufMeta.offset + stream.writeBufMeta.length; + stream.writeBufMeta.eof = true; + } + updateFlowControlOnWriteToStream(stream, data.length); + stream.conn.streamManager->updateWritableStreams(stream); +} + void writeDataToQuicStream(QuicCryptoStream& stream, Buf data) { stream.writeBuffer.append(std::move(data)); } diff --git a/quic/state/QuicStreamFunctions.h b/quic/state/QuicStreamFunctions.h index cca10e407..6682f6ac1 100644 --- a/quic/state/QuicStreamFunctions.h +++ b/quic/state/QuicStreamFunctions.h @@ -21,6 +21,17 @@ namespace quic { */ void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof); +/** + * Adds data represented in the form of BufferMeta to the end of the Buffer + * Meta queue of the stream. + * + * TODO: move to dsr directory. + */ +void writeBufMetaToQuicStream( + QuicStreamState& stream, + const BufferMeta& data, + bool eof); + /** * Adds data to the end of the write buffer of the QUIC crypto stream. This * data will be written onto the socket. diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 224564334..2102b7cf1 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -16,6 +16,71 @@ namespace quic { +/** + * A buffer representation without the actual data. This is part of the public + * facing interface. + * + * This is experimental. + */ +struct BufferMeta { + size_t length; + + explicit BufferMeta(size_t lengthIn) : length(lengthIn) {} +}; + +/** + * A write buffer representation without the actual data. This is used for + * write buffer management in a stream. + * + * This is experimental. + */ +struct WriteBufferMeta { + size_t length{0}; + size_t offset{0}; + bool eof{false}; + + WriteBufferMeta() = default; + + struct Builder { + Builder& setLength(size_t lengthIn) { + length_ = lengthIn; + return *this; + } + + Builder& setOffset(size_t offsetIn) { + offset_ = offsetIn; + return *this; + } + + Builder& setEOF(bool val) { + eof_ = val; + return *this; + } + + WriteBufferMeta build() { + return WriteBufferMeta(length_, offset_, eof_); + } + + private: + size_t length_{0}; + size_t offset_{0}; + bool eof_{false}; + }; + + WriteBufferMeta split(size_t splitLen) { + CHECK_GE(length, splitLen); + auto splitEof = splitLen == length && eof; + WriteBufferMeta splitOf(splitLen, offset, splitEof); + offset += splitLen; + length -= splitLen; + return splitOf; + } + + private: + explicit WriteBufferMeta(size_t lengthIn, size_t offsetIn, bool eofIn) + : length(lengthIn), offset(offsetIn), eof(eofIn) {} +}; + struct StreamBuffer { BufQueue data; uint64_t offset; @@ -97,6 +162,17 @@ struct QuicStreamLike { // egress packets that contains a *new* STREAM frame for this stream. uint64_t numPacketsTxWithNewData{0}; + // BufferMeta that has been writen to the QUIC layer. + // When offset is 0, nothing has been written to it. On first write, its + // starting offset will be currentWriteOffset + writeBuffer.chainLength(). + WriteBufferMeta writeBufMeta; + + // A map to store sent WriteBufferMetas for potential retransmission. + folly::F14FastMap retransmissionBufMetas; + + // WriteBufferMetas that's already marked lost. They will be retransmitted. + std::deque lossBufMetas; + /* * Either insert a new entry into the loss buffer, or merge the buffer with * an existing entry.