diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index f7829b002..93d1df5df 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -29,6 +29,8 @@ class EventBase; namespace quic { +class DSRPacketizationRequestSender; + class QuicSocket { public: /** @@ -1003,6 +1005,13 @@ class QuicSocket { bool eof, DeliveryCallback* cb = nullptr) = 0; + /** + * Set the DSRPacketizationRequestSender for a stream. + */ + virtual WriteResult setDSRPacketizationRequestSender( + StreamId id, + std::unique_ptr sender) = 0; + /** * Register a callback to be invoked when the peer has acknowledged the * given offset on the given stream. diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index be641aede..f7526b3fd 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -2005,6 +2005,9 @@ QuicSocket::WriteResult QuicTransportBase::writeBufMeta( if (!stream->writable()) { return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); } + if (!stream->dsrSender) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } if (stream->currentWriteOffset == 0 && stream->writeBuffer.empty()) { // If nothing has been written to writeBuffer ever, meta writing isn't // allowed. @@ -3176,4 +3179,56 @@ void QuicTransportBase::onTransportKnobs(Buf knobBlob) { knobBlob->length()); } +QuicSocket::WriteResult QuicTransportBase::setDSRPacketizationRequestSender( + StreamId id, + std::unique_ptr sender) { + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + if (isReceivingStream(conn_->nodeType, id)) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + FOLLY_MAYBE_UNUSED auto self = sharedGuard(); + try { + // Check whether stream exists before calling getStream to avoid + // creating a peer stream if it does not exist yet. + if (!conn_->streamManager->streamExists(id)) { + return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); + } + auto stream = conn_->streamManager->getStream(id); + if (!stream->writable()) { + return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); + } + if (stream->dsrSender != nullptr) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + stream->dsrSender = std::move(sender); + // Fow now, no appLimited or appIdle update here since we are not writing + // either BufferMetas yet. The first BufferMeta write will update it. + } catch (const QuicTransportException& ex) { + VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " " + << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(std::make_pair( + QuicErrorCode(ex.errorCode()), std::string("writeChain() error"))); + return folly::makeUnexpected(LocalErrorCode::TRANSPORT_ERROR); + } catch (const QuicInternalException& ex) { + VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " " + << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(std::make_pair( + QuicErrorCode(ex.errorCode()), std::string("writeChain() error"))); + return folly::makeUnexpected(ex.errorCode()); + } catch (const std::exception& ex) { + VLOG(4) << __func__ << " streamId=" << id << " " << ex.what() << " " + << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(std::make_pair( + QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), + std::string("writeChain() error"))); + return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR); + } + return folly::unit; +} + } // namespace quic diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 5e8cb6ce4..1eb08adfb 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -178,12 +178,18 @@ class QuicTransportBase : public QuicSocket { bool eof, DeliveryCallback* cb = nullptr) override; + // TODO: Maybe I should virtualize DSR related APIs and only implement in + // QuicServerTransport WriteResult writeBufMeta( StreamId id, const BufferMeta& data, bool eof, DeliveryCallback* cb = nullptr) override; + WriteResult setDSRPacketizationRequestSender( + StreamId id, + std::unique_ptr sender) override; + folly::Expected registerDeliveryCallback( StreamId id, uint64_t offset, diff --git a/quic/api/test/MockQuicSocket.h b/quic/api/test/MockQuicSocket.h index e5328f38e..bb42496f3 100644 --- a/quic/api/test/MockQuicSocket.h +++ b/quic/api/test/MockQuicSocket.h @@ -201,6 +201,16 @@ class MockQuicSocket : public QuicSocket { MOCK_METHOD4( writeBufMeta, WriteResult(StreamId, const BufferMeta&, bool, DeliveryCallback*)); + MOCK_METHOD2( + setDSRPacketizationRequestSenderRef, + WriteResult( + StreamId, + const std::unique_ptr&)); + WriteResult setDSRPacketizationRequestSender( + StreamId streamId, + std::unique_ptr sender) override { + return setDSRPacketizationRequestSenderRef(streamId, sender); + } MOCK_METHOD3( registerDeliveryCallback, folly::Expected( diff --git a/quic/api/test/QuicTransportTest.cpp b/quic/api/test/QuicTransportTest.cpp index 1db2a02ea..595f0d494 100644 --- a/quic/api/test/QuicTransportTest.cpp +++ b/quic/api/test/QuicTransportTest.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -3409,11 +3410,13 @@ TEST_F(QuicTransportTest, PrioritySetAndGet) { EXPECT_EQ(LocalErrorCode::CONNECTION_CLOSED, closedConnStreamPri.error()); } -TEST_F(QuicTransportTest, WriteBufMetaIntoStream) { +TEST_F(QuicTransportTest, SetDSRSenderAndWriteBufMetaIntoStream) { auto streamId = transport_->createBidirectionalStream().value(); size_t bufferLength = 2000; BufferMeta meta(bufferLength); auto buf = buildRandomInputData(20); + auto dsrSender = std::make_unique(); + transport_->setDSRPacketizationRequestSender(streamId, std::move(dsrSender)); // Some amount of real data needs to be written first: transport_->writeChain(streamId, std::move(buf), false); transport_->writeBufMeta(streamId, meta, true); diff --git a/quic/dsr/CMakeLists.txt b/quic/dsr/CMakeLists.txt new file mode 100644 index 000000000..a4923480c --- /dev/null +++ b/quic/dsr/CMakeLists.txt @@ -0,0 +1,45 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +add_library( + mvfst_dsr_types STATIC + Types.cpp +) + +target_include_directories( + mvfst_dsr_types PUBLIC + $ + $ +) + +add_dependencies( + mvfst_dsr_types + mvfst_codec_types +) + +target_libk_libraries( + mvfst_dsr_types PUBLIC + Folly::folly + mvfst_codec_types +) + +file( + GLOB_RECURSE QUIC_API_HEADERS_TOINSTALL + RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + *.h +) +list(FILTER QUIC_API_HEADERS_TOINSTALL EXCLUDE REGEX test/) +foreach(header ${QUIC_API_HEADERS_TOINSTALL}) + get_filename_component(header_dir ${header} DIRECTORY) + install(FILES ${header} DESTINATION include/quic/dsr/${header_dir}) +endforeach() + +install( + TARGETS mvfst_dsr_types + EXPORT mvfst-exports + DESTINATION lib +) + +add_subdirectory(test) diff --git a/quic/dsr/test/Mocks.h b/quic/dsr/test/Mocks.h index decae09dc..1c31c642b 100644 --- a/quic/dsr/test/Mocks.h +++ b/quic/dsr/test/Mocks.h @@ -22,7 +22,13 @@ class MockDSRPacketBuilder : public DSRPacketBuilderBase { return const_cast(*this).remainingSpaceNonConst(); } - MOCK_METHOD2(addSendInstruction, void(SendInstruction, uint32_t)); + MOCK_METHOD2(addSendInstructionPtr, void(const SendInstruction*, uint32_t)); + + void addSendInstruction( + SendInstruction instruction, + uint32_t streamEncodedSize) override { + addSendInstructionPtr(&instruction, streamEncodedSize); + } }; class MockDSRPacketizationRequestSender : public DSRPacketizationRequestSender { diff --git a/quic/dsr/test/SchedulerTest.cpp b/quic/dsr/test/SchedulerTest.cpp index 3981ef59a..90a92e6a6 100644 --- a/quic/dsr/test/SchedulerTest.cpp +++ b/quic/dsr/test/SchedulerTest.cpp @@ -53,12 +53,12 @@ TEST_F(SchedulerTest, ScheduleStream) { conn_.streamManager->hasDSRWritable()); EXPECT_TRUE(scheduler.hasPendingData()); EXPECT_CALL(builder_, remainingSpaceNonConst()).WillRepeatedly(Return(1000)); - EXPECT_CALL(builder_, addSendInstruction(_, _)) - .WillOnce(Invoke([&](SendInstruction instruction, uint32_t) { - EXPECT_EQ(stream->id, (size_t)instruction.streamId); - EXPECT_EQ(expectedBufMetaOffset, instruction.offset); - EXPECT_EQ(200, instruction.len); - EXPECT_TRUE(instruction.fin); + EXPECT_CALL(builder_, addSendInstructionPtr(_, _)) + .WillOnce(Invoke([&](const SendInstruction* instruction, uint32_t) { + EXPECT_EQ(stream->id, (size_t)instruction->streamId); + EXPECT_EQ(expectedBufMetaOffset, instruction->offset); + EXPECT_EQ(200, instruction->len); + EXPECT_TRUE(instruction->fin); })); EXPECT_TRUE(scheduler.writeStream(builder_)); diff --git a/quic/state/StreamData.h b/quic/state/StreamData.h index 5cc40b3f3..e1e9b6f59 100644 --- a/quic/state/StreamData.h +++ b/quic/state/StreamData.h @@ -12,6 +12,7 @@ #include #include #include +#include #include namespace quic { @@ -354,5 +355,7 @@ struct QuicStreamState : public QuicStreamLike { bool hasPeekableData() const { return readBuffer.size() > 0; } + + std::unique_ptr dsrSender; }; } // namespace quic