1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-06 22:22:38 +03:00

QuicSocket::setDSRPacketizationRequestSender API

Summary:
This assigns a DSRPacketizationRequestSender to a QUIC stream, and let
it own it.

Reviewed By: mjoras

Differential Revision: D27668523

fbshipit-source-id: 4beba6ddf247801368c3e1c24a0a4956490d45cd
This commit is contained in:
Yang Chi
2021-04-20 20:07:39 -07:00
committed by Facebook GitHub Bot
parent c0cc396363
commit aa59f9ef32
9 changed files with 145 additions and 8 deletions

View File

@@ -29,6 +29,8 @@ class EventBase;
namespace quic { namespace quic {
class DSRPacketizationRequestSender;
class QuicSocket { class QuicSocket {
public: public:
/** /**
@@ -1003,6 +1005,13 @@ class QuicSocket {
bool eof, bool eof,
DeliveryCallback* cb = nullptr) = 0; DeliveryCallback* cb = nullptr) = 0;
/**
* Set the DSRPacketizationRequestSender for a stream.
*/
virtual WriteResult setDSRPacketizationRequestSender(
StreamId id,
std::unique_ptr<DSRPacketizationRequestSender> sender) = 0;
/** /**
* Register a callback to be invoked when the peer has acknowledged the * Register a callback to be invoked when the peer has acknowledged the
* given offset on the given stream. * given offset on the given stream.

View File

@@ -2005,6 +2005,9 @@ QuicSocket::WriteResult QuicTransportBase::writeBufMeta(
if (!stream->writable()) { if (!stream->writable()) {
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
} }
if (!stream->dsrSender) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
if (stream->currentWriteOffset == 0 && stream->writeBuffer.empty()) { if (stream->currentWriteOffset == 0 && stream->writeBuffer.empty()) {
// If nothing has been written to writeBuffer ever, meta writing isn't // If nothing has been written to writeBuffer ever, meta writing isn't
// allowed. // allowed.
@@ -3176,4 +3179,56 @@ void QuicTransportBase::onTransportKnobs(Buf knobBlob) {
knobBlob->length()); knobBlob->length());
} }
QuicSocket::WriteResult QuicTransportBase::setDSRPacketizationRequestSender(
StreamId id,
std::unique_ptr<DSRPacketizationRequestSender> sender) {
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
FOLLY_MAYBE_UNUSED auto self = sharedGuard();
try {
// Check whether stream exists before calling getStream to avoid
// creating a peer stream if it does not exist yet.
if (!conn_->streamManager->streamExists(id)) {
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
}
auto stream = conn_->streamManager->getStream(id);
if (!stream->writable()) {
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
}
if (stream->dsrSender != nullptr) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
}
stream->dsrSender = std::move(sender);
// Fow now, no appLimited or appIdle update here since we are not writing
// either BufferMetas yet. The first BufferMeta write will update it.
} catch (const QuicTransportException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(std::make_pair(
QuicErrorCode(ex.errorCode()), std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR);
} catch (const QuicInternalException& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(std::make_pair(
QuicErrorCode(ex.errorCode()), std::string("writeChain() error")));
return folly::makeUnexpected(ex.errorCode());
} catch (const std::exception& ex) {
VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " "
<< *this;
exceptionCloseWhat_ = ex.what();
closeImpl(std::make_pair(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("writeChain() error")));
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
}
return folly::unit;
}
} // namespace quic } // namespace quic

View File

@@ -178,12 +178,18 @@ class QuicTransportBase : public QuicSocket {
bool eof, bool eof,
DeliveryCallback* cb = nullptr) override; DeliveryCallback* cb = nullptr) override;
// TODO: Maybe I should virtualize DSR related APIs and only implement in
// QuicServerTransport
WriteResult writeBufMeta( WriteResult writeBufMeta(
StreamId id, StreamId id,
const BufferMeta& data, const BufferMeta& data,
bool eof, bool eof,
DeliveryCallback* cb = nullptr) override; DeliveryCallback* cb = nullptr) override;
WriteResult setDSRPacketizationRequestSender(
StreamId id,
std::unique_ptr<DSRPacketizationRequestSender> sender) override;
folly::Expected<folly::Unit, LocalErrorCode> registerDeliveryCallback( folly::Expected<folly::Unit, LocalErrorCode> registerDeliveryCallback(
StreamId id, StreamId id,
uint64_t offset, uint64_t offset,

View File

@@ -201,6 +201,16 @@ class MockQuicSocket : public QuicSocket {
MOCK_METHOD4( MOCK_METHOD4(
writeBufMeta, writeBufMeta,
WriteResult(StreamId, const BufferMeta&, bool, DeliveryCallback*)); WriteResult(StreamId, const BufferMeta&, bool, DeliveryCallback*));
MOCK_METHOD2(
setDSRPacketizationRequestSenderRef,
WriteResult(
StreamId,
const std::unique_ptr<DSRPacketizationRequestSender>&));
WriteResult setDSRPacketizationRequestSender(
StreamId streamId,
std::unique_ptr<DSRPacketizationRequestSender> sender) override {
return setDSRPacketizationRequestSenderRef(streamId, sender);
}
MOCK_METHOD3( MOCK_METHOD3(
registerDeliveryCallback, registerDeliveryCallback,
folly::Expected<folly::Unit, LocalErrorCode>( folly::Expected<folly::Unit, LocalErrorCode>(

View File

@@ -19,6 +19,7 @@
#include <quic/common/BufUtil.h> #include <quic/common/BufUtil.h>
#include <quic/common/Timers.h> #include <quic/common/Timers.h>
#include <quic/common/test/TestUtils.h> #include <quic/common/test/TestUtils.h>
#include <quic/dsr/test/Mocks.h>
#include <quic/handshake/test/Mocks.h> #include <quic/handshake/test/Mocks.h>
#include <quic/logging/test/Mocks.h> #include <quic/logging/test/Mocks.h>
#include <quic/server/state/ServerStateMachine.h> #include <quic/server/state/ServerStateMachine.h>
@@ -3409,11 +3410,13 @@ TEST_F(QuicTransportTest, PrioritySetAndGet) {
EXPECT_EQ(LocalErrorCode::CONNECTION_CLOSED, closedConnStreamPri.error()); EXPECT_EQ(LocalErrorCode::CONNECTION_CLOSED, closedConnStreamPri.error());
} }
TEST_F(QuicTransportTest, WriteBufMetaIntoStream) { TEST_F(QuicTransportTest, SetDSRSenderAndWriteBufMetaIntoStream) {
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
size_t bufferLength = 2000; size_t bufferLength = 2000;
BufferMeta meta(bufferLength); BufferMeta meta(bufferLength);
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
auto dsrSender = std::make_unique<MockDSRPacketizationRequestSender>();
transport_->setDSRPacketizationRequestSender(streamId, std::move(dsrSender));
// Some amount of real data needs to be written first: // Some amount of real data needs to be written first:
transport_->writeChain(streamId, std::move(buf), false); transport_->writeChain(streamId, std::move(buf), false);
transport_->writeBufMeta(streamId, meta, true); transport_->writeBufMeta(streamId, meta, true);

45
quic/dsr/CMakeLists.txt Normal file
View File

@@ -0,0 +1,45 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
add_library(
mvfst_dsr_types STATIC
Types.cpp
)
target_include_directories(
mvfst_dsr_types PUBLIC
$<BUILD_INTERFACE:${QUIC_FBCODE_ROOT}>
$<INSTALL_INTERFACE:include/>
)
add_dependencies(
mvfst_dsr_types
mvfst_codec_types
)
target_libk_libraries(
mvfst_dsr_types PUBLIC
Folly::folly
mvfst_codec_types
)
file(
GLOB_RECURSE QUIC_API_HEADERS_TOINSTALL
RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
*.h
)
list(FILTER QUIC_API_HEADERS_TOINSTALL EXCLUDE REGEX test/)
foreach(header ${QUIC_API_HEADERS_TOINSTALL})
get_filename_component(header_dir ${header} DIRECTORY)
install(FILES ${header} DESTINATION include/quic/dsr/${header_dir})
endforeach()
install(
TARGETS mvfst_dsr_types
EXPORT mvfst-exports
DESTINATION lib
)
add_subdirectory(test)

View File

@@ -22,7 +22,13 @@ class MockDSRPacketBuilder : public DSRPacketBuilderBase {
return const_cast<MockDSRPacketBuilder&>(*this).remainingSpaceNonConst(); return const_cast<MockDSRPacketBuilder&>(*this).remainingSpaceNonConst();
} }
MOCK_METHOD2(addSendInstruction, void(SendInstruction, uint32_t)); MOCK_METHOD2(addSendInstructionPtr, void(const SendInstruction*, uint32_t));
void addSendInstruction(
SendInstruction instruction,
uint32_t streamEncodedSize) override {
addSendInstructionPtr(&instruction, streamEncodedSize);
}
}; };
class MockDSRPacketizationRequestSender : public DSRPacketizationRequestSender { class MockDSRPacketizationRequestSender : public DSRPacketizationRequestSender {

View File

@@ -53,12 +53,12 @@ TEST_F(SchedulerTest, ScheduleStream) {
conn_.streamManager->hasDSRWritable()); conn_.streamManager->hasDSRWritable());
EXPECT_TRUE(scheduler.hasPendingData()); EXPECT_TRUE(scheduler.hasPendingData());
EXPECT_CALL(builder_, remainingSpaceNonConst()).WillRepeatedly(Return(1000)); EXPECT_CALL(builder_, remainingSpaceNonConst()).WillRepeatedly(Return(1000));
EXPECT_CALL(builder_, addSendInstruction(_, _)) EXPECT_CALL(builder_, addSendInstructionPtr(_, _))
.WillOnce(Invoke([&](SendInstruction instruction, uint32_t) { .WillOnce(Invoke([&](const SendInstruction* instruction, uint32_t) {
EXPECT_EQ(stream->id, (size_t)instruction.streamId); EXPECT_EQ(stream->id, (size_t)instruction->streamId);
EXPECT_EQ(expectedBufMetaOffset, instruction.offset); EXPECT_EQ(expectedBufMetaOffset, instruction->offset);
EXPECT_EQ(200, instruction.len); EXPECT_EQ(200, instruction->len);
EXPECT_TRUE(instruction.fin); EXPECT_TRUE(instruction->fin);
})); }));
EXPECT_TRUE(scheduler.writeStream(builder_)); EXPECT_TRUE(scheduler.writeStream(builder_));

View File

@@ -12,6 +12,7 @@
#include <quic/QuicConstants.h> #include <quic/QuicConstants.h>
#include <quic/codec/Types.h> #include <quic/codec/Types.h>
#include <quic/common/SmallVec.h> #include <quic/common/SmallVec.h>
#include <quic/dsr/DSRPacketizationRequestSender.h>
#include <quic/state/QuicPriorityQueue.h> #include <quic/state/QuicPriorityQueue.h>
namespace quic { namespace quic {
@@ -354,5 +355,7 @@ struct QuicStreamState : public QuicStreamLike {
bool hasPeekableData() const { bool hasPeekableData() const {
return readBuffer.size() > 0; return readBuffer.size() > 0;
} }
std::unique_ptr<DSRPacketizationRequestSender> dsrSender;
}; };
} // namespace quic } // namespace quic