mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-11-25 15:43:13 +03:00
Summary: This is similar to the previous commit. We use a stack-based IOBuf instead of allocating one on the heap. This saves CPU because allocations/deallocations on the stack are a lot cheaper. Reviewed By: hanidamlaj Differential Revision: D53101722 fbshipit-source-id: dd59a7eca6498db19472a62f954db3e2f2f27a42
181 lines
5.8 KiB
C++
181 lines
5.8 KiB
C++
/*
|
|
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* LICENSE file in the root directory of this source tree.
|
|
*/
|
|
|
|
#include <quic/api/IoBufQuicBatch.h>
|
|
#include <quic/api/QuicGsoBatchWriters.h>
|
|
#include <quic/api/QuicTransportFunctions.h>
|
|
#include <quic/dsr/backend/DSRPacketizer.h>
|
|
|
|
namespace quic {
|
|
bool writeSingleQuicPacket(
|
|
IOBufQuicBatch& ioBufBatch,
|
|
BufAccessor& accessor,
|
|
ConnectionId dcid,
|
|
PacketNum packetNum,
|
|
PacketNum largestAckedByPeer,
|
|
const Aead& aead,
|
|
const PacketNumberCipher& headerCipher,
|
|
StreamId streamId,
|
|
size_t offset,
|
|
size_t length,
|
|
bool eof,
|
|
Buf buf) {
|
|
if (buf->computeChainDataLength() < length) {
|
|
LOG(ERROR) << "Insufficient data buffer";
|
|
return false;
|
|
}
|
|
auto buildBuf = accessor.obtain();
|
|
auto prevSize = buildBuf->length();
|
|
accessor.release(std::move(buildBuf));
|
|
|
|
auto rollbackBuf = [&accessor, prevSize]() {
|
|
auto buildBuf = accessor.obtain();
|
|
buildBuf->trimEnd(buildBuf->length() - prevSize);
|
|
accessor.release(std::move(buildBuf));
|
|
};
|
|
|
|
ShortHeader shortHeader(ProtectionType::KeyPhaseZero, dcid, packetNum);
|
|
InplaceQuicPacketBuilder builder(
|
|
accessor,
|
|
kDefaultMaxUDPPayload,
|
|
std::move(shortHeader),
|
|
largestAckedByPeer,
|
|
0);
|
|
builder.encodePacketHeader();
|
|
builder.accountForCipherOverhead(aead.getCipherOverhead());
|
|
// frontend has already limited the length to flow control, thus
|
|
// flowControlLen == length
|
|
auto dataLen = writeStreamFrameHeader(
|
|
builder,
|
|
streamId,
|
|
offset,
|
|
length,
|
|
length /* flow control len*/,
|
|
eof,
|
|
true /* skip length field in stream header */,
|
|
folly::none, /* stream group id */
|
|
false /* don't append frame to builder */);
|
|
BufQueue bufQueue(std::move(buf));
|
|
writeStreamFrameData(builder, bufQueue, *dataLen);
|
|
auto packet = std::move(builder).buildPacket();
|
|
CHECK(accessor.ownsBuffer());
|
|
|
|
if (packet.packet.empty) {
|
|
LOG(ERROR) << "DSR Send failed: Build empty packet.";
|
|
ioBufBatch.flush();
|
|
return false;
|
|
}
|
|
if (packet.body.empty()) {
|
|
LOG(ERROR) << "DSR Send failed: Build empty body buffer";
|
|
rollbackBuf();
|
|
ioBufBatch.flush();
|
|
return false;
|
|
}
|
|
CHECK(!packet.header.isChained());
|
|
|
|
auto headerLen = packet.header.length();
|
|
buildBuf = accessor.obtain();
|
|
CHECK(
|
|
packet.body.data() > buildBuf->data() &&
|
|
packet.body.tail() <= buildBuf->tail());
|
|
CHECK(
|
|
packet.header.data() >= buildBuf->data() &&
|
|
packet.header.tail() < buildBuf->tail());
|
|
// Trim off everything before the current packet, and the header length, so
|
|
// buildBuf's data starts from the body part of buildBuf.
|
|
buildBuf->trimStart(prevSize + headerLen);
|
|
// buildBuf and packetbuildBuf is actually the same.
|
|
auto packetbuildBuf =
|
|
aead.inplaceEncrypt(std::move(buildBuf), &packet.header, packetNum);
|
|
CHECK_EQ(packetbuildBuf->headroom(), headerLen + prevSize);
|
|
// Include header back.
|
|
packetbuildBuf->prepend(headerLen);
|
|
|
|
HeaderForm headerForm = packet.packet.header.getHeaderForm();
|
|
encryptPacketHeader(
|
|
headerForm,
|
|
packetbuildBuf->writableData(),
|
|
headerLen,
|
|
packetbuildBuf->data() + headerLen,
|
|
packetbuildBuf->length() - headerLen,
|
|
headerCipher);
|
|
CHECK(!packetbuildBuf->isChained());
|
|
auto encodedSize = packetbuildBuf->length();
|
|
// Include previous packets back.
|
|
packetbuildBuf->prepend(prevSize);
|
|
accessor.release(std::move(packetbuildBuf));
|
|
bool ret =
|
|
ioBufBatch.write(nullptr /* no need to pass buildBuf */, encodedSize);
|
|
return ret;
|
|
}
|
|
|
|
// TODO using a connection state for this is kind of janky and we should
|
|
// refactor the batch writer interface to not need this.
|
|
// This isn't a real connection, it's just used for the batch writer state.
|
|
// 44 is near the number of the maximum GSO the kernel can accept for a full
|
|
// Ethernet MTU (44 * 1452 = 63888)
|
|
static auto& getThreadLocalConn(size_t maxPackets = 44) {
|
|
static thread_local QuicConnectionStateBase fakeConn{QuicNodeType::Server};
|
|
static thread_local bool initAccessor FOLLY_MAYBE_UNUSED = [&]() {
|
|
fakeConn.bufAccessor =
|
|
new SimpleBufAccessor{kDefaultMaxUDPPayload * maxPackets};
|
|
// Store this so we can use it to set the batch writer.
|
|
fakeConn.transportSettings.maxBatchSize = maxPackets;
|
|
return true;
|
|
}();
|
|
return fakeConn;
|
|
}
|
|
|
|
BufQuicBatchResult writePacketsGroup(
|
|
QuicAsyncUDPSocket& sock,
|
|
RequestGroup& reqGroup,
|
|
const std::function<Buf(const PacketizationRequest& req)>& bufProvider) {
|
|
if (reqGroup.requests.empty()) {
|
|
LOG(ERROR) << "Empty packetization request";
|
|
return {};
|
|
}
|
|
auto& fakeConn = getThreadLocalConn();
|
|
auto& bufAccessor = *fakeConn.bufAccessor;
|
|
auto batchWriter = BatchWriterPtr(new GSOInplacePacketBatchWriter(
|
|
fakeConn, fakeConn.transportSettings.maxBatchSize));
|
|
IOBufQuicBatch ioBufBatch(
|
|
std::move(batchWriter),
|
|
sock,
|
|
reqGroup.clientAddress,
|
|
nullptr /* statsCallback */,
|
|
nullptr /* happyEyeballsState */);
|
|
if (!reqGroup.cipherPair->aead || !reqGroup.cipherPair->headerCipher) {
|
|
LOG(ERROR) << "Missing ciphers";
|
|
return {};
|
|
}
|
|
// It's ok if reqGourp's size is larger than ioBufBatch's batch size. The
|
|
// ioBufBatch will flush when it hits the limit then start a new batch
|
|
// transparently.
|
|
for (const auto& request : reqGroup.requests) {
|
|
auto ret = writeSingleQuicPacket(
|
|
ioBufBatch,
|
|
bufAccessor,
|
|
reqGroup.dcid,
|
|
request.packetNum,
|
|
request.largestAckedPacketNum,
|
|
*reqGroup.cipherPair->aead,
|
|
*reqGroup.cipherPair->headerCipher,
|
|
request.streamId,
|
|
request.offset,
|
|
request.len,
|
|
request.fin,
|
|
bufProvider(request));
|
|
if (!ret) {
|
|
return ioBufBatch.getResult();
|
|
}
|
|
}
|
|
ioBufBatch.flush();
|
|
return ioBufBatch.getResult();
|
|
}
|
|
|
|
} // namespace quic
|