1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

Write a buffer's meta data into QUIC

Summary:
Instead of writing real data into the transport, we want to support a
use case where only its metadata is written to the transport. Sending of the
real data is delegated to another entity in such setup.

Reviewed By: mjoras

Differential Revision: D26131772

fbshipit-source-id: 4fcfa3a1626203f63c61898e6de089a3079d043d
This commit is contained in:
Yang Chi
2021-03-03 23:48:14 -08:00
committed by Facebook GitHub Bot
parent 968b39f4a9
commit adc1e15eff
8 changed files with 229 additions and 0 deletions

View File

@@ -955,6 +955,15 @@ class QuicSocket {
bool eof,
DeliveryCallback* cb = nullptr) = 0;
/**
* Write a data representation in the form of BufferMeta to the given stream.
*/
virtual WriteResult writeBufMeta(
StreamId id,
const BufferMeta& data,
bool eof,
DeliveryCallback* cb = nullptr) = 0;
/**
* Register a callback to be invoked when the peer has acknowledged the
* given offset on the given stream.

View File

@@ -1968,6 +1968,79 @@ QuicSocket::WriteResult QuicTransportBase::writeChain(
return folly::unit;
}
QuicSocket::WriteResult QuicTransportBase::writeBufMeta(
StreamId id,
const BufferMeta& data,
bool eof,
DeliveryCallback* cb) {
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
FOLLY_MAYBE_UNUSED auto self = sharedGuard();
try {
// Check whether stream exists before calling getStream to avoid
// creating a peer stream if it does not exist yet.
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id);
if (!stream->writable()) {
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
}
if (stream->currentWriteOffset == 0 && stream->writeBuffer.empty()) {
// If nothing has been written to writeBuffer ever, meta writing isn't
// allowed.
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
// Register DeliveryCallback for the data + eof offset.
if (cb) {
auto dataLength = data.length + (eof ? 1 : 0);
if (dataLength) {
auto currentLargestWriteOffset = getLargestWriteOffsetSeen(*stream);
registerDeliveryCallback(
id, currentLargestWriteOffset + dataLength - 1, cb);
}
}
bool wasAppLimitedOrIdle = false;
if (conn_->congestionController) {
wasAppLimitedOrIdle = conn_->congestionController->isAppLimited();
wasAppLimitedOrIdle |= conn_->streamManager->isAppIdle();
}
writeBufMetaToQuicStream(*stream, data, eof);
// If we were previously app limited restart pacing with the current rate.
if (wasAppLimitedOrIdle && conn_->pacer) {
conn_->pacer->reset();
}
updateWriteLooper(true);
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(std::make_pair(
QuicErrorCode(ex.errorCode()), std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(std::make_pair(
QuicErrorCode(ex.errorCode()), std::string("writeChain() error")));
return folly::makeUnexpected(ex.errorCode());
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(std::make_pair(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
return folly::unit;
}
folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::registerDeliveryCallback(
StreamId id,

View File

@@ -178,6 +178,12 @@ class QuicTransportBase : public QuicSocket {
bool eof,
DeliveryCallback* cb = nullptr) override;
WriteResult writeBufMeta(
StreamId id,
const BufferMeta& data,
bool eof,
DeliveryCallback* cb = nullptr) override;
folly::Expected<folly::Unit, LocalErrorCode> registerDeliveryCallback(
StreamId id,
uint64_t offset,

View File

@@ -198,6 +198,9 @@ class MockQuicSocket : public QuicSocket {
MOCK_METHOD4(
writeChain,
WriteResult(StreamId, SharedBuf, bool, DeliveryCallback*));
MOCK_METHOD4(
writeBufMeta,
WriteResult(StreamId, const BufferMeta&, bool, DeliveryCallback*));
MOCK_METHOD3(
registerDeliveryCallback,
folly::Expected<folly::Unit, LocalErrorCode>(

View File

@@ -3279,5 +3279,31 @@ TEST_F(QuicTransportTest, PrioritySetAndGet) {
EXPECT_EQ(LocalErrorCode::CONNECTION_CLOSED, closedConnStreamPri.error());
}
TEST_F(QuicTransportTest, WriteBufMetaIntoStream) {
auto streamId = transport_->createBidirectionalStream().value();
size_t bufferLength = 2000;
BufferMeta meta(bufferLength);
auto buf = buildRandomInputData(20);
// Some amount of real data needs to be written first:
transport_->writeChain(streamId, std::move(buf), false);
transport_->writeBufMeta(streamId, meta, true);
auto& stream =
*transport_->getConnectionState().streamManager->findStream(streamId);
EXPECT_GE(stream.writeBufMeta.offset, 20);
EXPECT_EQ(stream.writeBufMeta.length, bufferLength);
EXPECT_TRUE(stream.writeBufMeta.eof);
EXPECT_EQ(
*stream.finalWriteOffset,
stream.writeBufMeta.offset + stream.writeBufMeta.length);
}
TEST_F(QuicTransportTest, WriteBufMetaWithoutRealData) {
auto streamId = transport_->createBidirectionalStream().value();
size_t bufferLength = 2000;
BufferMeta meta(bufferLength);
auto result = transport_->writeBufMeta(streamId, meta, true);
EXPECT_TRUE(result.hasError());
}
} // namespace test
} // namespace quic

View File

@@ -29,6 +29,9 @@ void prependToBuf(quic::Buf& buf, quic::Buf toAppend) {
namespace quic {
void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof) {
// Once data is written to writeBufMeta, no more data can be written to
// writeBuffer.
CHECK_EQ(0, stream.writeBufMeta.offset);
uint64_t len = 0;
if (data) {
len = data->computeChainDataLength();
@@ -48,6 +51,28 @@ void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof) {
stream.conn.streamManager->updateWritableStreams(stream);
}
void writeBufMetaToQuicStream(
QuicStreamState& stream,
const BufferMeta& data,
bool eof) {
if (data.length > 0) {
maybeWriteBlockAfterAPIWrite(stream);
}
if (stream.writeBufMeta.offset == 0) {
CHECK(!stream.finalWriteOffset.has_value());
stream.writeBufMeta.offset =
stream.currentWriteOffset + stream.writeBuffer.chainLength();
}
stream.writeBufMeta.length += data.length;
if (eof) {
stream.finalWriteOffset =
stream.writeBufMeta.offset + stream.writeBufMeta.length;
stream.writeBufMeta.eof = true;
}
updateFlowControlOnWriteToStream(stream, data.length);
stream.conn.streamManager->updateWritableStreams(stream);
}
void writeDataToQuicStream(QuicCryptoStream& stream, Buf data) {
stream.writeBuffer.append(std::move(data));
}

View File

@@ -21,6 +21,17 @@ namespace quic {
*/
void writeDataToQuicStream(QuicStreamState& stream, Buf data, bool eof);
/**
* Adds data represented in the form of BufferMeta to the end of the Buffer
* Meta queue of the stream.
*
* TODO: move to dsr directory.
*/
void writeBufMetaToQuicStream(
QuicStreamState& stream,
const BufferMeta& data,
bool eof);
/**
* Adds data to the end of the write buffer of the QUIC crypto stream. This
* data will be written onto the socket.

View File

@@ -16,6 +16,71 @@
namespace quic {
/**
* A buffer representation without the actual data. This is part of the public
* facing interface.
*
* This is experimental.
*/
struct BufferMeta {
size_t length;
explicit BufferMeta(size_t lengthIn) : length(lengthIn) {}
};
/**
* A write buffer representation without the actual data. This is used for
* write buffer management in a stream.
*
* This is experimental.
*/
struct WriteBufferMeta {
size_t length{0};
size_t offset{0};
bool eof{false};
WriteBufferMeta() = default;
struct Builder {
Builder& setLength(size_t lengthIn) {
length_ = lengthIn;
return *this;
}
Builder& setOffset(size_t offsetIn) {
offset_ = offsetIn;
return *this;
}
Builder& setEOF(bool val) {
eof_ = val;
return *this;
}
WriteBufferMeta build() {
return WriteBufferMeta(length_, offset_, eof_);
}
private:
size_t length_{0};
size_t offset_{0};
bool eof_{false};
};
WriteBufferMeta split(size_t splitLen) {
CHECK_GE(length, splitLen);
auto splitEof = splitLen == length && eof;
WriteBufferMeta splitOf(splitLen, offset, splitEof);
offset += splitLen;
length -= splitLen;
return splitOf;
}
private:
explicit WriteBufferMeta(size_t lengthIn, size_t offsetIn, bool eofIn)
: length(lengthIn), offset(offsetIn), eof(eofIn) {}
};
struct StreamBuffer {
BufQueue data;
uint64_t offset;
@@ -97,6 +162,17 @@ struct QuicStreamLike {
// egress packets that contains a *new* STREAM frame for this stream.
uint64_t numPacketsTxWithNewData{0};
// BufferMeta that has been writen to the QUIC layer.
// When offset is 0, nothing has been written to it. On first write, its
// starting offset will be currentWriteOffset + writeBuffer.chainLength().
WriteBufferMeta writeBufMeta;
// A map to store sent WriteBufferMetas for potential retransmission.
folly::F14FastMap<uint64_t, WriteBufferMeta> retransmissionBufMetas;
// WriteBufferMetas that's already marked lost. They will be retransmitted.
std::deque<WriteBufferMeta> lossBufMetas;
/*
* Either insert a new entry into the loss buffer, or merge the buffer with
* an existing entry.