diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index 4e6283dc8..98b982d9c 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -487,4 +487,13 @@ enum class EncryptionLevel : uint8_t { AppData, }; +/** + * This is a temporary type used during our data path experiment. It may not + * exist for long time. + */ +enum class DataPathType : uint8_t { + ChainedMemory, + ContinuousMemory, +}; + } // namespace quic diff --git a/quic/api/QuicPacketScheduler.cpp b/quic/api/QuicPacketScheduler.cpp index 60039a9e6..20e2b1887 100644 --- a/quic/api/QuicPacketScheduler.cpp +++ b/quic/api/QuicPacketScheduler.cpp @@ -114,10 +114,7 @@ FrameScheduler FrameScheduler::Builder::build() && { FrameScheduler::FrameScheduler(std::string name) : name_(std::move(name)) {} -std::pair< - folly::Optional, - folly::Optional> -FrameScheduler::scheduleFramesForPacket( +SchedulingResult FrameScheduler::scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes) { // We need to keep track of writable bytes after writing header. @@ -173,7 +170,7 @@ FrameScheduler::scheduleFramesForPacket( streamFrameScheduler_->writeStreams(wrapper); } - return std::make_pair(folly::none, std::move(builder).buildPacket()); + return SchedulingResult(folly::none, std::move(builder).buildPacket()); } bool FrameScheduler::hasData() const { @@ -497,10 +494,7 @@ bool CryptoStreamScheduler::hasData() const { !cryptoStream_.lossBuffer.empty(); } -std::pair< - folly::Optional, - folly::Optional> -CryptoStreamScheduler::scheduleFramesForPacket( +SchedulingResult CryptoStreamScheduler::scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes) { // We need to keep track of writable bytes after writing header. @@ -508,11 +502,11 @@ CryptoStreamScheduler::scheduleFramesForPacket( ? writableBytes - builder.getHeaderBytes() : 0; if (!writableBytes) { - return std::make_pair(folly::none, folly::none); + return SchedulingResult(folly::none, folly::none); } PacketBuilderWrapper wrapper(builder, writableBytes); writeCryptoData(wrapper); - return std::make_pair(folly::none, std::move(builder).buildPacket()); + return SchedulingResult(folly::none, std::move(builder).buildPacket()); } CloningScheduler::CloningScheduler( @@ -532,10 +526,7 @@ bool CloningScheduler::hasData() const { conn_.outstandingHandshakePacketsCount); } -std::pair< - folly::Optional, - folly::Optional> -CloningScheduler::scheduleFramesForPacket( +SchedulingResult CloningScheduler::scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes) { // The writableBytes in this function shouldn't be limited by cwnd, since @@ -589,11 +580,11 @@ CloningScheduler::scheduleFramesForPacket( // Rebuilder will write the rest of frames auto rebuildResult = rebuilder.rebuildFromPacket(*iter); if (rebuildResult) { - return std::make_pair( + return SchedulingResult( std::move(rebuildResult), std::move(regularBuilder).buildPacket()); } } - return std::make_pair(folly::none, folly::none); + return SchedulingResult(folly::none, folly::none); } std::string CloningScheduler::name() const { diff --git a/quic/api/QuicPacketScheduler.h b/quic/api/QuicPacketScheduler.h index d955f4759..40ebac463 100644 --- a/quic/api/QuicPacketScheduler.h +++ b/quic/api/QuicPacketScheduler.h @@ -20,6 +20,16 @@ namespace quic { +struct SchedulingResult { + folly::Optional packetEvent; + folly::Optional packet; + + explicit SchedulingResult( + folly::Optional packetEventIn, + folly::Optional packetIn) + : packetEvent(std::move(packetEventIn)), packet(std::move(packetIn)) {} +}; + /** * Common interface for Quic packet schedulers * used at the top level. @@ -35,10 +45,7 @@ class QuicPacketScheduler { * Returns an optional PacketEvent which indicates if the built out packet is * a clone and the associated PacketEvent for both origin and clone. */ - virtual std::pair< - folly::Optional, - folly::Optional> - scheduleFramesForPacket( + virtual SchedulingResult scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes) = 0; @@ -290,10 +297,7 @@ class CryptoStreamScheduler { * clone and the associated PacketEvent for both origin and clone. In the case * of CryptoStreamScheduler, this will always return folly::none. */ - std::pair< - folly::Optional, - folly::Optional> - scheduleFramesForPacket( + SchedulingResult scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes); @@ -350,10 +354,7 @@ class FrameScheduler : public QuicPacketScheduler { explicit FrameScheduler(std::string name); - virtual std::pair< - folly::Optional, - folly::Optional> - scheduleFramesForPacket( + SchedulingResult scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes) override; @@ -404,10 +405,7 @@ class CloningScheduler : public QuicPacketScheduler { * Returns a optional PacketEvent which indicates if the built out packet is a * clone and the associated PacketEvent for both origin and clone. */ - std::pair< - folly::Optional, - folly::Optional> - scheduleFramesForPacket( + SchedulingResult scheduleFramesForPacket( RegularQuicPacketBuilder&& builder, uint32_t writableBytes) override; diff --git a/quic/api/QuicTransportFunctions.cpp b/quic/api/QuicTransportFunctions.cpp index 3f39a6149..71a49b18c 100644 --- a/quic/api/QuicTransportFunctions.cpp +++ b/quic/api/QuicTransportFunctions.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -162,6 +161,75 @@ uint64_t writeQuicDataToSocketImpl( return written; } +DataPathResult iobufChainBasedBuildScheduleEncrypt( + QuicConnectionStateBase& connection, + PacketHeader header, + PacketNumberSpace pnSpace, + PacketNum packetNum, + uint64_t cipherOverhead, + QuicPacketScheduler& scheduler, + uint64_t writableBytes, + IOBufQuicBatch& ioBufBatch, + const Aead& aead, + const PacketNumberCipher& headerCipher) { + RegularQuicPacketBuilder pktBuilder( + connection.udpSendPacketLen, + std::move(header), + getAckState(connection, pnSpace).largestAckedByPeer); + pktBuilder.setCipherOverhead(cipherOverhead); + auto result = + scheduler.scheduleFramesForPacket(std::move(pktBuilder), writableBytes); + auto& packet = result.packet; + if (!packet || packet->packet.frames.empty()) { + ioBufBatch.flush(); + if (connection.loopDetectorCallback) { + connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME; + } + return DataPathResult::makeBuildFailure(); + } + if (!packet->body) { + // No more space remaining. + ioBufBatch.flush(); + if (connection.loopDetectorCallback) { + connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY; + } + return DataPathResult::makeBuildFailure(); + } + packet->header->coalesce(); + auto headerLen = packet->header->length(); + auto bodyLen = packet->body->computeChainDataLength(); + auto unencrypted = + folly::IOBuf::create(headerLen + bodyLen + aead.getCipherOverhead()); + auto bodyCursor = folly::io::Cursor(packet->body.get()); + bodyCursor.pull(unencrypted->writableData() + headerLen, bodyLen); + unencrypted->advance(headerLen); + unencrypted->append(bodyLen); + auto packetBuf = + aead.encrypt(std::move(unencrypted), packet->header.get(), packetNum); + DCHECK(packetBuf->headroom() == headerLen); + packetBuf->clear(); + auto headerCursor = folly::io::Cursor(packet->header.get()); + headerCursor.pull(packetBuf->writableData(), headerLen); + packetBuf->append(headerLen + bodyLen + aead.getCipherOverhead()); + + HeaderForm headerForm = packet->packet.header.getHeaderForm(); + encryptPacketHeader( + headerForm, + packetBuf->writableData(), + headerLen, + packetBuf->data() + headerLen, + packetBuf->length() - headerLen, + headerCipher); + auto encodedSize = packetBuf->computeChainDataLength(); + bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize); + if (ret) { + // update stats and connection + QUIC_STATS(connection.infoCallback, onWrite, encodedSize); + QUIC_STATS(connection.infoCallback, onPacketSent); + } + return DataPathResult::makeWriteResult(ret, std::move(result), encodedSize); +} + } // namespace namespace quic { @@ -950,73 +1018,40 @@ uint64_t writeConnectionDataToSocket( } else { writableBytes -= cipherOverhead; } - RegularQuicPacketBuilder pktBuilder( - connection.udpSendPacketLen, + + // TODO: Select a different DataPathFunc based on TransportSettings + const auto& dataPlainFunc = iobufChainBasedBuildScheduleEncrypt; + auto ret = dataPlainFunc( + connection, std::move(header), - getAckState(connection, pnSpace).largestAckedByPeer); - pktBuilder.setCipherOverhead(cipherOverhead); - auto result = - scheduler.scheduleFramesForPacket(std::move(pktBuilder), writableBytes); - auto& packet = result.second; - if (!packet || packet->packet.frames.empty()) { - ioBufBatch.flush(); - if (connection.loopDetectorCallback) { - connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME; - } - return ioBufBatch.getPktSent(); - } - if (!packet->body) { - // No more space remaining. - ioBufBatch.flush(); - if (connection.loopDetectorCallback) { - connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY; - } - return ioBufBatch.getPktSent(); - } - packet->header->coalesce(); - auto headerLen = packet->header->length(); - auto bodyLen = packet->body->computeChainDataLength(); - auto unencrypted = - folly::IOBuf::create(headerLen + bodyLen + aead.getCipherOverhead()); - auto bodyCursor = folly::io::Cursor(packet->body.get()); - bodyCursor.pull(unencrypted->writableData() + headerLen, bodyLen); - unencrypted->advance(headerLen); - unencrypted->append(bodyLen); - auto packetBuf = - aead.encrypt(std::move(unencrypted), packet->header.get(), packetNum); - DCHECK(packetBuf->headroom() == headerLen); - packetBuf->clear(); - auto headerCursor = folly::io::Cursor(packet->header.get()); - headerCursor.pull(packetBuf->writableData(), headerLen); - packetBuf->append(headerLen + bodyLen + aead.getCipherOverhead()); - - HeaderForm headerForm = packet->packet.header.getHeaderForm(); - encryptPacketHeader( - headerForm, - packetBuf->writableData(), - headerLen, - packetBuf->data() + headerLen, - packetBuf->length() - headerLen, + pnSpace, + packetNum, + cipherOverhead, + scheduler, + writableBytes, + ioBufBatch, + aead, headerCipher); - auto encodedSize = packetBuf->computeChainDataLength(); - bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize); - if (ret) { - // update stats and connection - QUIC_STATS(connection.infoCallback, onWrite, encodedSize); - QUIC_STATS(connection.infoCallback, onPacketSent); + if (!ret.buildSuccess) { + return ioBufBatch.getPktSent(); } - + // If we build a packet, we updateConnection(), even if write might have + // been failed. Because if it builds, a lot of states need to be updated no + // matter the write result. We are basically treating this case as if we + // pretend write was also successful but packet is lost somewhere in the + // network. + auto& result = ret.result; updateConnection( connection, - std::move(result.first), - std::move(result.second->packet), + std::move(result->packetEvent), + std::move(result->packet->packet), Clock::now(), - folly::to(encodedSize)); + folly::to(ret.encodedSize)); // if ioBufBatch.write returns false // it is because a flush() call failed - if (!ret) { + if (!ret.writeSuccess) { if (connection.loopDetectorCallback) { connection.writeDebugState.noWriteReason = NoWriteReason::SOCKET_FAILURE; diff --git a/quic/api/QuicTransportFunctions.h b/quic/api/QuicTransportFunctions.h index 3725e6c1c..94abf5443 100644 --- a/quic/api/QuicTransportFunctions.h +++ b/quic/api/QuicTransportFunctions.h @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -20,6 +21,48 @@ // successfully scheduled namespace quic { +struct DataPathResult { + bool buildSuccess{false}; + bool writeSuccess{false}; + folly::Optional result; + uint64_t encodedSize{0}; + + static DataPathResult makeBuildFailure() { + return DataPathResult(); + } + + static DataPathResult makeWriteResult( + bool writeSuc, + SchedulingResult&& res, + uint64_t encodedSizeIn) { + return DataPathResult(writeSuc, std::move(res), encodedSizeIn); + } + + private: + explicit DataPathResult() = default; + + explicit DataPathResult( + bool writeSuc, + SchedulingResult&& res, + uint64_t encodedSizeIn) + : buildSuccess(true), + writeSuccess(writeSuc), + result(std::move(res)), + encodedSize(encodedSizeIn) {} +}; + +using DataPathFunc = std::function; + using HeaderBuilder = std::function, - folly::Optional> - scheduleFramesForPacket( + SchedulingResult scheduleFramesForPacket( RegularQuicPacketBuilder&& builderIn, uint32_t writableBytes) override { auto builder = @@ -42,11 +39,7 @@ class MockFrameScheduler : public FrameScheduler { GMOCK_METHOD0_(, const, , hasData, bool()); MOCK_METHOD2( _scheduleFramesForPacket, - std::pair< - folly::Optional, - folly::Optional>( - std::unique_ptr&, - uint32_t)); + SchedulingResult(std::unique_ptr&, uint32_t)); }; class MockReadCallback : public QuicSocket::ReadCallback { diff --git a/quic/api/test/QuicPacketSchedulerTest.cpp b/quic/api/test/QuicPacketSchedulerTest.cpp index d2f166f85..04f7aab8b 100644 --- a/quic/api/test/QuicPacketSchedulerTest.cpp +++ b/quic/api/test/QuicPacketSchedulerTest.cpp @@ -337,8 +337,8 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerTest) { conn.ackStates.appDataAckState.largestAckedByPeer); auto result = cloningScheduler.scheduleFramesForPacket( std::move(builder), kDefaultUDPSendPacketLen); - EXPECT_TRUE(result.first.has_value() && result.second.has_value()); - EXPECT_EQ(packetNum, *result.first); + EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value()); + EXPECT_EQ(packetNum, *result.packetEvent); } TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) { @@ -383,10 +383,10 @@ TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) { auto result = cloningScheduler.scheduleFramesForPacket( std::move(regularBuilder), kDefaultUDPSendPacketLen); - EXPECT_TRUE(result.first.has_value() && result.second.has_value()); - EXPECT_EQ(packetNum, *result.first); - // written packet (result.second) should not have any frame in the builder - auto& writtenPacket = *result.second; + EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value()); + EXPECT_EQ(packetNum, *result.packetEvent); + // written packet (result.packet) should not have any frame in the builder + auto& writtenPacket = *result.packet; auto shortHeader = writtenPacket.packet.header.asShort(); CHECK(shortHeader); EXPECT_EQ(ProtectionType::KeyPhaseOne, shortHeader->getProtectionType()); @@ -437,8 +437,8 @@ TEST_F(QuicPacketSchedulerTest, DoNotCloneProcessedClonedPacket) { conn.ackStates.initialAckState.largestAckedByPeer); auto result = cloningScheduler.scheduleFramesForPacket( std::move(builder), kDefaultUDPSendPacketLen); - EXPECT_TRUE(result.first.has_value() && result.second.has_value()); - EXPECT_EQ(expected, *result.first); + EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value()); + EXPECT_EQ(expected, *result.packetEvent); } TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasDataIgnoresNonAppData) { @@ -482,8 +482,8 @@ TEST_F(QuicPacketSchedulerTest, DoNotCloneHandshake) { conn.ackStates.appDataAckState.largestAckedByPeer); auto result = cloningScheduler.scheduleFramesForPacket( std::move(builder), kDefaultUDPSendPacketLen); - EXPECT_TRUE(result.first.has_value() && result.second.has_value()); - EXPECT_EQ(expected, *result.first); + EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value()); + EXPECT_EQ(expected, *result.packetEvent); } TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) { @@ -508,7 +508,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) { std::move(packet), folly::IOBuf::copyBuffer("if you are the dealer"), folly::IOBuf::copyBuffer("I'm out of the game")); - return std::make_pair(folly::none, std::move(builtPacket)); + return SchedulingResult(folly::none, std::move(builtPacket)); })); RegularQuicPacketBuilder builder( conn.udpSendPacketLen, @@ -516,23 +516,23 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) { conn.ackStates.appDataAckState.largestAckedByPeer); auto result = cloningScheduler.scheduleFramesForPacket( std::move(builder), kDefaultUDPSendPacketLen); - EXPECT_EQ(folly::none, result.first); - EXPECT_EQ(result.second->packet.header.getHeaderForm(), HeaderForm::Short); - ShortHeader& shortHeader = *result.second->packet.header.asShort(); + EXPECT_EQ(folly::none, result.packetEvent); + EXPECT_EQ(result.packet->packet.header.getHeaderForm(), HeaderForm::Short); + ShortHeader& shortHeader = *result.packet->packet.header.asShort(); EXPECT_EQ(ProtectionType::KeyPhaseOne, shortHeader.getProtectionType()); EXPECT_EQ( conn.ackStates.appDataAckState.nextPacketNum, shortHeader.getPacketSequenceNum()); - EXPECT_EQ(1, result.second->packet.frames.size()); + EXPECT_EQ(1, result.packet->packet.frames.size()); MaxDataFrame* maxDataFrame = - result.second->packet.frames.front().asMaxDataFrame(); + result.packet->packet.frames.front().asMaxDataFrame(); ASSERT_NE(maxDataFrame, nullptr); EXPECT_EQ(2832, maxDataFrame->maximumData); EXPECT_TRUE(folly::IOBufEqualTo{}( *folly::IOBuf::copyBuffer("if you are the dealer"), - *result.second->header)); + *result.packet->header)); EXPECT_TRUE(folly::IOBufEqualTo{}( - *folly::IOBuf::copyBuffer("I'm out of the game"), *result.second->body)); + *folly::IOBuf::copyBuffer("I'm out of the game"), *result.packet->body)); } TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) { @@ -565,9 +565,9 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) { conn.ackStates.appDataAckState.largestAckedByPeer); auto packetResult = cloningScheduler.scheduleFramesForPacket( std::move(builder), conn.udpSendPacketLen); - EXPECT_EQ(expectedPacketEvent, *packetResult.first); + EXPECT_EQ(expectedPacketEvent, *packetResult.packetEvent); int32_t verifyConnWindowUpdate = 1, verifyStreamWindowUpdate = 1; - for (const auto& frame : packetResult.second->packet.frames) { + for (const auto& frame : packetResult.packet->packet.frames) { switch (frame.type()) { case QuicWriteFrame::Type::MaxStreamDataFrame_E: { const MaxStreamDataFrame& maxStreamDataFrame = @@ -592,10 +592,10 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) { EXPECT_EQ(0, verifyConnWindowUpdate); // Verify the built out packet has refreshed window update values - EXPECT_GE(packetResult.second->packet.frames.size(), 2); + EXPECT_GE(packetResult.packet->packet.frames.size(), 2); uint32_t streamWindowUpdateCounter = 0; uint32_t connWindowUpdateCounter = 0; - for (auto& frame : packetResult.second->packet.frames) { + for (auto& frame : packetResult.packet->packet.frames) { auto streamFlowControl = frame.asMaxStreamDataFrame(); if (!streamFlowControl) { continue; @@ -603,7 +603,7 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) { streamWindowUpdateCounter++; EXPECT_EQ(1700, streamFlowControl->maximumData); } - for (auto& frame : packetResult.second->packet.frames) { + for (auto& frame : packetResult.packet->packet.frames) { auto connFlowControl = frame.asMaxDataFrame(); if (!connFlowControl) { continue; diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index 5bb675570..b62772409 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -165,6 +165,9 @@ struct TransportSettings { DurationRep timeReorderingThreshDivisor{kDefaultTimeReorderingThreshDivisor}; // Whether to close client transport on read error from socket bool closeClientOnReadError{false}; + // A temporary type to control DataPath write style. Will be gone after we + // are done with experiment. + DataPathType dataPathType{DataPathType::ChainedMemory}; }; } // namespace quic