1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Quic Refactor to move Build->Schedule->Encrypt operation into a functor

Summary:
To prepare for another configuration of this chain. With this, now we
can land all the outstanding GSO optimization diffs without having to worry
about breaking the production code.

Reviewed By: mjoras

Differential Revision: D20838453

fbshipit-source-id: 807a0c546305864e0d70f8989f31d3de3b812278
This commit is contained in:
Yang Chi
2020-04-10 09:13:15 -07:00
committed by Facebook GitHub Bot
parent 9189d56442
commit 7d52f280f8
8 changed files with 195 additions and 123 deletions

View File

@@ -487,4 +487,13 @@ enum class EncryptionLevel : uint8_t {
AppData,
};
/**
* This is a temporary type used during our data path experiment. It may not
* exist for long time.
*/
enum class DataPathType : uint8_t {
ChainedMemory,
ContinuousMemory,
};
} // namespace quic

View File

@@ -114,10 +114,7 @@ FrameScheduler FrameScheduler::Builder::build() && {
FrameScheduler::FrameScheduler(std::string name) : name_(std::move(name)) {}
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
FrameScheduler::scheduleFramesForPacket(
SchedulingResult FrameScheduler::scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes) {
// We need to keep track of writable bytes after writing header.
@@ -173,7 +170,7 @@ FrameScheduler::scheduleFramesForPacket(
streamFrameScheduler_->writeStreams(wrapper);
}
return std::make_pair(folly::none, std::move(builder).buildPacket());
return SchedulingResult(folly::none, std::move(builder).buildPacket());
}
bool FrameScheduler::hasData() const {
@@ -497,10 +494,7 @@ bool CryptoStreamScheduler::hasData() const {
!cryptoStream_.lossBuffer.empty();
}
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
CryptoStreamScheduler::scheduleFramesForPacket(
SchedulingResult CryptoStreamScheduler::scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes) {
// We need to keep track of writable bytes after writing header.
@@ -508,11 +502,11 @@ CryptoStreamScheduler::scheduleFramesForPacket(
? writableBytes - builder.getHeaderBytes()
: 0;
if (!writableBytes) {
return std::make_pair(folly::none, folly::none);
return SchedulingResult(folly::none, folly::none);
}
PacketBuilderWrapper wrapper(builder, writableBytes);
writeCryptoData(wrapper);
return std::make_pair(folly::none, std::move(builder).buildPacket());
return SchedulingResult(folly::none, std::move(builder).buildPacket());
}
CloningScheduler::CloningScheduler(
@@ -532,10 +526,7 @@ bool CloningScheduler::hasData() const {
conn_.outstandingHandshakePacketsCount);
}
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
CloningScheduler::scheduleFramesForPacket(
SchedulingResult CloningScheduler::scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes) {
// The writableBytes in this function shouldn't be limited by cwnd, since
@@ -589,11 +580,11 @@ CloningScheduler::scheduleFramesForPacket(
// Rebuilder will write the rest of frames
auto rebuildResult = rebuilder.rebuildFromPacket(*iter);
if (rebuildResult) {
return std::make_pair(
return SchedulingResult(
std::move(rebuildResult), std::move(regularBuilder).buildPacket());
}
}
return std::make_pair(folly::none, folly::none);
return SchedulingResult(folly::none, folly::none);
}
std::string CloningScheduler::name() const {

View File

@@ -20,6 +20,16 @@
namespace quic {
struct SchedulingResult {
folly::Optional<PacketEvent> packetEvent;
folly::Optional<PacketBuilderInterface::Packet> packet;
explicit SchedulingResult(
folly::Optional<PacketEvent> packetEventIn,
folly::Optional<PacketBuilderInterface::Packet> packetIn)
: packetEvent(std::move(packetEventIn)), packet(std::move(packetIn)) {}
};
/**
* Common interface for Quic packet schedulers
* used at the top level.
@@ -35,10 +45,7 @@ class QuicPacketScheduler {
* Returns an optional PacketEvent which indicates if the built out packet is
* a clone and the associated PacketEvent for both origin and clone.
*/
virtual std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
scheduleFramesForPacket(
virtual SchedulingResult scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes) = 0;
@@ -290,10 +297,7 @@ class CryptoStreamScheduler {
* clone and the associated PacketEvent for both origin and clone. In the case
* of CryptoStreamScheduler, this will always return folly::none.
*/
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
scheduleFramesForPacket(
SchedulingResult scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes);
@@ -350,10 +354,7 @@ class FrameScheduler : public QuicPacketScheduler {
explicit FrameScheduler(std::string name);
virtual std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
scheduleFramesForPacket(
SchedulingResult scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes) override;
@@ -404,10 +405,7 @@ class CloningScheduler : public QuicPacketScheduler {
* Returns a optional PacketEvent which indicates if the built out packet is a
* clone and the associated PacketEvent for both origin and clone.
*/
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
scheduleFramesForPacket(
SchedulingResult scheduleFramesForPacket(
RegularQuicPacketBuilder&& builder,
uint32_t writableBytes) override;

View File

@@ -11,7 +11,6 @@
#include <folly/Overload.h>
#include <quic/QuicConstants.h>
#include <quic/QuicException.h>
#include <quic/api/IoBufQuicBatch.h>
#include <quic/api/QuicTransportFunctions.h>
#include <quic/codec/QuicPacketBuilder.h>
#include <quic/codec/QuicWriteCodec.h>
@@ -162,6 +161,75 @@ uint64_t writeQuicDataToSocketImpl(
return written;
}
DataPathResult iobufChainBasedBuildScheduleEncrypt(
QuicConnectionStateBase& connection,
PacketHeader header,
PacketNumberSpace pnSpace,
PacketNum packetNum,
uint64_t cipherOverhead,
QuicPacketScheduler& scheduler,
uint64_t writableBytes,
IOBufQuicBatch& ioBufBatch,
const Aead& aead,
const PacketNumberCipher& headerCipher) {
RegularQuicPacketBuilder pktBuilder(
connection.udpSendPacketLen,
std::move(header),
getAckState(connection, pnSpace).largestAckedByPeer);
pktBuilder.setCipherOverhead(cipherOverhead);
auto result =
scheduler.scheduleFramesForPacket(std::move(pktBuilder), writableBytes);
auto& packet = result.packet;
if (!packet || packet->packet.frames.empty()) {
ioBufBatch.flush();
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME;
}
return DataPathResult::makeBuildFailure();
}
if (!packet->body) {
// No more space remaining.
ioBufBatch.flush();
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY;
}
return DataPathResult::makeBuildFailure();
}
packet->header->coalesce();
auto headerLen = packet->header->length();
auto bodyLen = packet->body->computeChainDataLength();
auto unencrypted =
folly::IOBuf::create(headerLen + bodyLen + aead.getCipherOverhead());
auto bodyCursor = folly::io::Cursor(packet->body.get());
bodyCursor.pull(unencrypted->writableData() + headerLen, bodyLen);
unencrypted->advance(headerLen);
unencrypted->append(bodyLen);
auto packetBuf =
aead.encrypt(std::move(unencrypted), packet->header.get(), packetNum);
DCHECK(packetBuf->headroom() == headerLen);
packetBuf->clear();
auto headerCursor = folly::io::Cursor(packet->header.get());
headerCursor.pull(packetBuf->writableData(), headerLen);
packetBuf->append(headerLen + bodyLen + aead.getCipherOverhead());
HeaderForm headerForm = packet->packet.header.getHeaderForm();
encryptPacketHeader(
headerForm,
packetBuf->writableData(),
headerLen,
packetBuf->data() + headerLen,
packetBuf->length() - headerLen,
headerCipher);
auto encodedSize = packetBuf->computeChainDataLength();
bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize);
if (ret) {
// update stats and connection
QUIC_STATS(connection.infoCallback, onWrite, encodedSize);
QUIC_STATS(connection.infoCallback, onPacketSent);
}
return DataPathResult::makeWriteResult(ret, std::move(result), encodedSize);
}
} // namespace
namespace quic {
@@ -950,73 +1018,40 @@ uint64_t writeConnectionDataToSocket(
} else {
writableBytes -= cipherOverhead;
}
RegularQuicPacketBuilder pktBuilder(
connection.udpSendPacketLen,
// TODO: Select a different DataPathFunc based on TransportSettings
const auto& dataPlainFunc = iobufChainBasedBuildScheduleEncrypt;
auto ret = dataPlainFunc(
connection,
std::move(header),
getAckState(connection, pnSpace).largestAckedByPeer);
pktBuilder.setCipherOverhead(cipherOverhead);
auto result =
scheduler.scheduleFramesForPacket(std::move(pktBuilder), writableBytes);
auto& packet = result.second;
if (!packet || packet->packet.frames.empty()) {
ioBufBatch.flush();
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME;
}
return ioBufBatch.getPktSent();
}
if (!packet->body) {
// No more space remaining.
ioBufBatch.flush();
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY;
}
return ioBufBatch.getPktSent();
}
packet->header->coalesce();
auto headerLen = packet->header->length();
auto bodyLen = packet->body->computeChainDataLength();
auto unencrypted =
folly::IOBuf::create(headerLen + bodyLen + aead.getCipherOverhead());
auto bodyCursor = folly::io::Cursor(packet->body.get());
bodyCursor.pull(unencrypted->writableData() + headerLen, bodyLen);
unencrypted->advance(headerLen);
unencrypted->append(bodyLen);
auto packetBuf =
aead.encrypt(std::move(unencrypted), packet->header.get(), packetNum);
DCHECK(packetBuf->headroom() == headerLen);
packetBuf->clear();
auto headerCursor = folly::io::Cursor(packet->header.get());
headerCursor.pull(packetBuf->writableData(), headerLen);
packetBuf->append(headerLen + bodyLen + aead.getCipherOverhead());
HeaderForm headerForm = packet->packet.header.getHeaderForm();
encryptPacketHeader(
headerForm,
packetBuf->writableData(),
headerLen,
packetBuf->data() + headerLen,
packetBuf->length() - headerLen,
pnSpace,
packetNum,
cipherOverhead,
scheduler,
writableBytes,
ioBufBatch,
aead,
headerCipher);
auto encodedSize = packetBuf->computeChainDataLength();
bool ret = ioBufBatch.write(std::move(packetBuf), encodedSize);
if (ret) {
// update stats and connection
QUIC_STATS(connection.infoCallback, onWrite, encodedSize);
QUIC_STATS(connection.infoCallback, onPacketSent);
if (!ret.buildSuccess) {
return ioBufBatch.getPktSent();
}
// If we build a packet, we updateConnection(), even if write might have
// been failed. Because if it builds, a lot of states need to be updated no
// matter the write result. We are basically treating this case as if we
// pretend write was also successful but packet is lost somewhere in the
// network.
auto& result = ret.result;
updateConnection(
connection,
std::move(result.first),
std::move(result.second->packet),
std::move(result->packetEvent),
std::move(result->packet->packet),
Clock::now(),
folly::to<uint32_t>(encodedSize));
folly::to<uint32_t>(ret.encodedSize));
// if ioBufBatch.write returns false
// it is because a flush() call failed
if (!ret) {
if (!ret.writeSuccess) {
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason =
NoWriteReason::SOCKET_FAILURE;

View File

@@ -12,6 +12,7 @@
#include <folly/io/async/AsyncUDPSocket.h>
#include <quic/QuicException.h>
#include <quic/api/IoBufQuicBatch.h>
#include <quic/api/QuicPacketScheduler.h>
#include <quic/api/QuicSocket.h>
#include <quic/state/StateData.h>
@@ -20,6 +21,48 @@
// successfully scheduled
namespace quic {
struct DataPathResult {
bool buildSuccess{false};
bool writeSuccess{false};
folly::Optional<SchedulingResult> result;
uint64_t encodedSize{0};
static DataPathResult makeBuildFailure() {
return DataPathResult();
}
static DataPathResult makeWriteResult(
bool writeSuc,
SchedulingResult&& res,
uint64_t encodedSizeIn) {
return DataPathResult(writeSuc, std::move(res), encodedSizeIn);
}
private:
explicit DataPathResult() = default;
explicit DataPathResult(
bool writeSuc,
SchedulingResult&& res,
uint64_t encodedSizeIn)
: buildSuccess(true),
writeSuccess(writeSuc),
result(std::move(res)),
encodedSize(encodedSizeIn) {}
};
using DataPathFunc = std::function<DataPathResult(
QuicConnectionStateBase&,
PacketHeader,
PacketNumberSpace,
PacketNum,
uint64_t,
QuicPacketScheduler&,
uint64_t,
IOBufQuicBatch&,
const Aead&,
const PacketNumberCipher&)>;
using HeaderBuilder = std::function<PacketHeader(
const ConnectionId& srcConnId,
const ConnectionId& dstConnId,

View File

@@ -28,10 +28,7 @@ class MockFrameScheduler : public FrameScheduler {
MockFrameScheduler() : FrameScheduler("mock") {}
// override methods accepting rvalue ref since gmock doesn't support it
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>
scheduleFramesForPacket(
SchedulingResult scheduleFramesForPacket(
RegularQuicPacketBuilder&& builderIn,
uint32_t writableBytes) override {
auto builder =
@@ -42,11 +39,7 @@ class MockFrameScheduler : public FrameScheduler {
GMOCK_METHOD0_(, const, , hasData, bool());
MOCK_METHOD2(
_scheduleFramesForPacket,
std::pair<
folly::Optional<PacketEvent>,
folly::Optional<RegularQuicPacketBuilder::Packet>>(
std::unique_ptr<RegularQuicPacketBuilder>&,
uint32_t));
SchedulingResult(std::unique_ptr<RegularQuicPacketBuilder>&, uint32_t));
};
class MockReadCallback : public QuicSocket::ReadCallback {

View File

@@ -337,8 +337,8 @@ TEST_F(QuicPacketSchedulerTest, CloningSchedulerTest) {
conn.ackStates.appDataAckState.largestAckedByPeer);
auto result = cloningScheduler.scheduleFramesForPacket(
std::move(builder), kDefaultUDPSendPacketLen);
EXPECT_TRUE(result.first.has_value() && result.second.has_value());
EXPECT_EQ(packetNum, *result.first);
EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value());
EXPECT_EQ(packetNum, *result.packetEvent);
}
TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) {
@@ -383,10 +383,10 @@ TEST_F(QuicPacketSchedulerTest, WriteOnlyOutstandingPacketsTest) {
auto result = cloningScheduler.scheduleFramesForPacket(
std::move(regularBuilder), kDefaultUDPSendPacketLen);
EXPECT_TRUE(result.first.has_value() && result.second.has_value());
EXPECT_EQ(packetNum, *result.first);
// written packet (result.second) should not have any frame in the builder
auto& writtenPacket = *result.second;
EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value());
EXPECT_EQ(packetNum, *result.packetEvent);
// written packet (result.packet) should not have any frame in the builder
auto& writtenPacket = *result.packet;
auto shortHeader = writtenPacket.packet.header.asShort();
CHECK(shortHeader);
EXPECT_EQ(ProtectionType::KeyPhaseOne, shortHeader->getProtectionType());
@@ -437,8 +437,8 @@ TEST_F(QuicPacketSchedulerTest, DoNotCloneProcessedClonedPacket) {
conn.ackStates.initialAckState.largestAckedByPeer);
auto result = cloningScheduler.scheduleFramesForPacket(
std::move(builder), kDefaultUDPSendPacketLen);
EXPECT_TRUE(result.first.has_value() && result.second.has_value());
EXPECT_EQ(expected, *result.first);
EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value());
EXPECT_EQ(expected, *result.packetEvent);
}
TEST_F(QuicPacketSchedulerTest, CloneSchedulerHasDataIgnoresNonAppData) {
@@ -482,8 +482,8 @@ TEST_F(QuicPacketSchedulerTest, DoNotCloneHandshake) {
conn.ackStates.appDataAckState.largestAckedByPeer);
auto result = cloningScheduler.scheduleFramesForPacket(
std::move(builder), kDefaultUDPSendPacketLen);
EXPECT_TRUE(result.first.has_value() && result.second.has_value());
EXPECT_EQ(expected, *result.first);
EXPECT_TRUE(result.packetEvent.has_value() && result.packet.has_value());
EXPECT_EQ(expected, *result.packetEvent);
}
TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) {
@@ -508,7 +508,7 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) {
std::move(packet),
folly::IOBuf::copyBuffer("if you are the dealer"),
folly::IOBuf::copyBuffer("I'm out of the game"));
return std::make_pair(folly::none, std::move(builtPacket));
return SchedulingResult(folly::none, std::move(builtPacket));
}));
RegularQuicPacketBuilder builder(
conn.udpSendPacketLen,
@@ -516,23 +516,23 @@ TEST_F(QuicPacketSchedulerTest, CloneSchedulerUseNormalSchedulerFirst) {
conn.ackStates.appDataAckState.largestAckedByPeer);
auto result = cloningScheduler.scheduleFramesForPacket(
std::move(builder), kDefaultUDPSendPacketLen);
EXPECT_EQ(folly::none, result.first);
EXPECT_EQ(result.second->packet.header.getHeaderForm(), HeaderForm::Short);
ShortHeader& shortHeader = *result.second->packet.header.asShort();
EXPECT_EQ(folly::none, result.packetEvent);
EXPECT_EQ(result.packet->packet.header.getHeaderForm(), HeaderForm::Short);
ShortHeader& shortHeader = *result.packet->packet.header.asShort();
EXPECT_EQ(ProtectionType::KeyPhaseOne, shortHeader.getProtectionType());
EXPECT_EQ(
conn.ackStates.appDataAckState.nextPacketNum,
shortHeader.getPacketSequenceNum());
EXPECT_EQ(1, result.second->packet.frames.size());
EXPECT_EQ(1, result.packet->packet.frames.size());
MaxDataFrame* maxDataFrame =
result.second->packet.frames.front().asMaxDataFrame();
result.packet->packet.frames.front().asMaxDataFrame();
ASSERT_NE(maxDataFrame, nullptr);
EXPECT_EQ(2832, maxDataFrame->maximumData);
EXPECT_TRUE(folly::IOBufEqualTo{}(
*folly::IOBuf::copyBuffer("if you are the dealer"),
*result.second->header));
*result.packet->header));
EXPECT_TRUE(folly::IOBufEqualTo{}(
*folly::IOBuf::copyBuffer("I'm out of the game"), *result.second->body));
*folly::IOBuf::copyBuffer("I'm out of the game"), *result.packet->body));
}
TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) {
@@ -565,9 +565,9 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) {
conn.ackStates.appDataAckState.largestAckedByPeer);
auto packetResult = cloningScheduler.scheduleFramesForPacket(
std::move(builder), conn.udpSendPacketLen);
EXPECT_EQ(expectedPacketEvent, *packetResult.first);
EXPECT_EQ(expectedPacketEvent, *packetResult.packetEvent);
int32_t verifyConnWindowUpdate = 1, verifyStreamWindowUpdate = 1;
for (const auto& frame : packetResult.second->packet.frames) {
for (const auto& frame : packetResult.packet->packet.frames) {
switch (frame.type()) {
case QuicWriteFrame::Type::MaxStreamDataFrame_E: {
const MaxStreamDataFrame& maxStreamDataFrame =
@@ -592,10 +592,10 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) {
EXPECT_EQ(0, verifyConnWindowUpdate);
// Verify the built out packet has refreshed window update values
EXPECT_GE(packetResult.second->packet.frames.size(), 2);
EXPECT_GE(packetResult.packet->packet.frames.size(), 2);
uint32_t streamWindowUpdateCounter = 0;
uint32_t connWindowUpdateCounter = 0;
for (auto& frame : packetResult.second->packet.frames) {
for (auto& frame : packetResult.packet->packet.frames) {
auto streamFlowControl = frame.asMaxStreamDataFrame();
if (!streamFlowControl) {
continue;
@@ -603,7 +603,7 @@ TEST_F(QuicPacketSchedulerTest, CloneWillGenerateNewWindowUpdate) {
streamWindowUpdateCounter++;
EXPECT_EQ(1700, streamFlowControl->maximumData);
}
for (auto& frame : packetResult.second->packet.frames) {
for (auto& frame : packetResult.packet->packet.frames) {
auto connFlowControl = frame.asMaxDataFrame();
if (!connFlowControl) {
continue;

View File

@@ -165,6 +165,9 @@ struct TransportSettings {
DurationRep timeReorderingThreshDivisor{kDefaultTimeReorderingThreshDivisor};
// Whether to close client transport on read error from socket
bool closeClientOnReadError{false};
// A temporary type to control DataPath write style. Will be gone after we
// are done with experiment.
DataPathType dataPathType{DataPathType::ChainedMemory};
};
} // namespace quic