1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-05 11:21:09 +03:00

process multiple packets on recvmsg

Summary:
In the current client code we read one packet, go back to epoll, and then read
another packet. This is not very efficient.

This changes it so that we can read multiple packets in one go from an epoll
callback.

This only performs changes on the client

Reviewed By: mjoras

Differential Revision: D18797962

fbshipit-source-id: 81be82111064ade4fe3a07b1d9d3d01e180f29f5
This commit is contained in:
Subodh Iyengar
2019-12-04 12:02:24 -08:00
committed by Facebook Github Bot
parent 02d473e8ec
commit d2fa2cbcd6
16 changed files with 365 additions and 103 deletions

View File

@@ -1579,12 +1579,13 @@ void QuicTransportBase::onNetworkData(
updateWriteLooper(true);
};
try {
if (networkData.data) {
conn_->lossState.totalBytesRecvd +=
networkData.data->computeChainDataLength();
}
conn_->lossState.totalBytesRecvd += networkData.totalData;
auto originalAckVersion = currentAckStateVersion(*conn_);
onReadData(peer, std::move(networkData));
for (auto& packet : networkData.packets) {
onReadData(
peer,
NetworkDataSingle(std::move(packet), networkData.receiveTimePoint));
}
processCallbacksAfterNetworkData();
if (closeState_ != CloseState::CLOSED) {
if (currentAckStateVersion(*conn_) != originalAckVersion) {

View File

@@ -278,7 +278,7 @@ class QuicTransportBase : public QuicSocket {
*/
virtual void onReadData(
const folly::SocketAddress& peer,
NetworkData&& networkData) = 0;
NetworkDataSingle&& networkData) = 0;
/**
* Invoked when we have to write some data to the wire.

View File

@@ -201,7 +201,7 @@ class MockQuicTransport : public QuicServerTransport {
void onNetworkData(
const folly::SocketAddress& peer,
NetworkData&& networkData) noexcept override {
onNetworkData(peer, networkData.data.get());
onNetworkData(peer, networkData);
}
GMOCK_METHOD2_(
@@ -209,7 +209,7 @@ class MockQuicTransport : public QuicServerTransport {
noexcept,
,
onNetworkData,
void(const folly::SocketAddress&, const folly::IOBuf*));
void(const folly::SocketAddress&, const NetworkData&));
GMOCK_METHOD1_(
,

View File

@@ -156,7 +156,8 @@ class TestQuicTransport
return lossTimeout_.getTimeRemaining();
}
void onReadData(const folly::SocketAddress&, NetworkData&& data) override {
void onReadData(const folly::SocketAddress&, NetworkDataSingle&& data)
override {
if (!data.data) {
return;
}

View File

@@ -78,7 +78,7 @@ class TestQuicTransport
void onReadData(
const folly::SocketAddress& /*peer*/,
NetworkData&& /*networkData*/) noexcept override {}
NetworkDataSingle&& /*networkData*/) noexcept override {}
void writeData() override {
if (closed) {

View File

@@ -95,7 +95,7 @@ QuicClientTransport::~QuicClientTransport() {
void QuicClientTransport::processUDPData(
const folly::SocketAddress& peer,
NetworkData&& networkData) {
NetworkDataSingle&& networkData) {
folly::IOBufQueue udpData{folly::IOBufQueue::cacheChainLength()};
udpData.append(std::move(networkData.data));
@@ -652,7 +652,7 @@ void QuicClientTransport::processPacketData(
void QuicClientTransport::onReadData(
const folly::SocketAddress& peer,
NetworkData&& networkData) {
NetworkDataSingle&& networkData) {
if (closeState_ == CloseState::CLOSED) {
// If we are closed, then we shoudn't process new network data.
// TODO: we might want to process network data if we decide that we should
@@ -1029,49 +1029,74 @@ bool QuicClientTransport::shouldOnlyNotify() {
void QuicClientTransport::onNotifyDataAvailable() noexcept {
DCHECK(conn_) << "trying to receive packets without a connection";
auto readBufferSize = conn_->transportSettings.maxRecvPacketSize;
auto readBuffer = folly::IOBuf::create(readBufferSize);
const size_t numPackets = conn_->transportSettings.maxRecvBatchSize;
struct iovec vec;
NetworkData networkData;
networkData.packets.reserve(numPackets);
size_t totalData = 0;
folly::Optional<folly::SocketAddress> server;
for (size_t packetNum = 0; packetNum < numPackets; ++packetNum) {
// We create 1 buffer per packet so that it is not shared, this enables
// us to decrypt in place. If the fizz decrypt api could decrypt in-place
// even if shared, then we could allocate one giant IOBuf here.
Buf readBuffer = folly::IOBuf::createCombined(readBufferSize);
struct iovec vec {};
vec.iov_base = readBuffer->writableData();
vec.iov_len = readBufferSize;
struct sockaddr_storage addrStorage;
socklen_t addrLen = sizeof(addrStorage);
sockaddr* rawAddr{nullptr};
struct sockaddr_storage addrStorage {};
socklen_t addrLen{sizeof(addrStorage)};
if (!server) {
memset(&addrStorage, 0, size_t(addrLen));
auto rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = socket_->address().getFamily();
}
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
struct msghdr msg {};
msg.msg_name = rawAddr;
msg.msg_namelen = addrLen;
msg.msg_namelen = size_t(addrLen);
msg.msg_iov = &vec;
msg.msg_iovlen = 1;
ssize_t ret = socket_->recvmsg(&msg, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return;
if (errno == EAGAIN || errno == EWOULDBLOCK || packetNum != 0) {
// If we got a retriable error, or we had previously processed
// a packet successfully, let's use that packet.
break;
}
return onReadError(folly::AsyncSocketException(
folly::AsyncSocketException::INTERNAL_ERROR,
"::recvmsg() failed",
errno));
} else if (ret == 0) {
break;
}
size_t bytesRead = size_t(ret);
folly::SocketAddress server;
server.setFromSockaddr(rawAddr, addrLen);
VLOG(10) << "Got data from socket peer=" << server << " len=" << bytesRead;
// TODO: we can get better receive time accuracy than this, with
// SO_TIMESTAMP or SIOCGSTAMP.
auto packetReceiveTime = Clock::now();
totalData += bytesRead;
if (!server) {
server = folly::SocketAddress();
server->setFromSockaddr(rawAddr, addrLen);
}
VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead;
readBuffer->append(bytesRead);
networkData.packets.emplace_back(std::move(readBuffer));
QUIC_TRACE(udp_recvd, *conn_, bytesRead);
if (conn_->qLogger) {
conn_->qLogger->addDatagramReceived(bytesRead);
}
NetworkData networkData(std::move(readBuffer), packetReceiveTime);
onNetworkData(server, std::move(networkData));
}
if (networkData.packets.empty()) {
return;
}
DCHECK(server.hasValue());
// TODO: we can get better receive time accuracy than this, with
// SO_TIMESTAMP or SIOCGSTAMP.
auto packetReceiveTime = Clock::now();
networkData.receiveTimePoint = packetReceiveTime;
networkData.totalData = totalData;
onNetworkData(*server, std::move(networkData));
}
void QuicClientTransport::

View File

@@ -89,8 +89,9 @@ class QuicClientTransport
bool isTLSResumed() const;
// From QuicTransportBase
void onReadData(const folly::SocketAddress& peer, NetworkData&& networkData)
override;
void onReadData(
const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) override;
void writeData() override;
void closeTransport() override;
void unbindConnection() override;
@@ -153,7 +154,7 @@ class QuicClientTransport
void processUDPData(
const folly::SocketAddress& peer,
NetworkData&& networkData);
NetworkDataSingle&& networkData);
void processPacketData(
const folly::SocketAddress& peer,

View File

@@ -1287,6 +1287,17 @@ class FakeOneRttHandshakeLayer : public ClientHandshake {
class QuicClientTransportTest : public Test {
public:
struct TestReadData {
std::unique_ptr<folly::IOBuf> data;
folly::SocketAddress addr;
folly::Optional<int> err;
TestReadData(folly::ByteRange dataIn, const folly::SocketAddress& addrIn)
: data(folly::IOBuf::copyBuffer(dataIn)), addr(addrIn) {}
explicit TestReadData(int errIn) : err(errIn) {}
};
QuicClientTransportTest() : eventbase_(std::make_unique<folly::EventBase>()) {
auto socket =
std::make_unique<folly::test::MockAsyncUDPSocket>(eventbase_.get());
@@ -1306,6 +1317,30 @@ class QuicClientTransportTest : public Test {
ON_CALL(*sock, resumeRead(_))
.WillByDefault(SaveArg<0>(&networkReadCallback));
ON_CALL(*sock, address()).WillByDefault(ReturnRef(serverAddr));
ON_CALL(*sock, recvmsg(_, _))
.WillByDefault(Invoke([&](struct msghdr* msg, int) -> ssize_t {
DCHECK_GT(msg->msg_iovlen, 0);
if (socketReads.empty()) {
errno = EAGAIN;
return -1;
}
if (socketReads[0].err) {
errno = *socketReads[0].err;
return -1;
}
auto testData = std::move(socketReads[0].data);
testData->coalesce();
size_t testDataLen = testData->length();
memcpy(
msg->msg_iov[0].iov_base, testData->data(), testData->length());
if (msg->msg_name) {
socklen_t msg_len = socketReads[0].addr.getAddress(
static_cast<sockaddr_storage*>(msg->msg_name));
msg->msg_namelen = msg_len;
}
socketReads.pop_front();
return testDataLen;
}));
EXPECT_EQ(client->getConn().selfConnectionIds.size(), 1);
EXPECT_EQ(
client->getConn().selfConnectionIds[0].connId,
@@ -1483,23 +1518,19 @@ class QuicClientTransportTest : public Test {
folly::ByteRange data,
bool writes = true) {
ASSERT_TRUE(networkReadCallback);
EXPECT_CALL(*sock, recvmsg(_, _))
.WillOnce(Invoke([&](struct msghdr* msg, int) {
DCHECK_GT(msg->msg_iovlen, 0);
memcpy(msg->msg_iov[0].iov_base, data.data(), data.size());
if (msg->msg_name) {
socklen_t msg_len =
addr.getAddress(static_cast<sockaddr_storage*>(msg->msg_name));
msg->msg_namelen = msg_len;
}
return data.size();
}));
socketReads.emplace_back(TestReadData(data, addr));
networkReadCallback->onNotifyDataAvailable();
if (writes) {
loopForWrites();
}
}
void deliverNetworkError(int err) {
ASSERT_TRUE(networkReadCallback);
socketReads.emplace_back(TestReadData(err));
networkReadCallback->onNotifyDataAvailable();
}
void deliverDataWithoutErrorCheck(folly::ByteRange data, bool writes = true) {
deliverDataWithoutErrorCheck(serverAddr, std::move(data), writes);
}
@@ -1650,6 +1681,7 @@ class QuicClientTransportTest : public Test {
protected:
std::vector<std::unique_ptr<folly::IOBuf>> socketWrites;
std::deque<TestReadData> socketReads;
MockDeliveryCallback deliveryCallback;
MockReadCallback readCb;
MockConnectionCallback clientConnCallback;
@@ -2734,6 +2766,140 @@ TEST_F(QuicClientTransportAfterStartTest, ReadStream) {
client->close(folly::none);
}
TEST_F(QuicClientTransportAfterStartTest, ReadStreamMultiplePackets) {
StreamId streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, &readCb);
bool dataDelivered = false;
auto data = IOBuf::copyBuffer("hello");
auto expected = data->clone();
expected->prependChain(data->clone());
EXPECT_CALL(readCb, readAvailable(streamId)).WillOnce(Invoke([&](auto) {
auto readData = client->read(streamId, 1000);
auto copy = readData->first->clone();
LOG(INFO) << "Client received data="
<< copy->clone()->moveToFbString().toStdString()
<< " on stream=" << streamId;
EXPECT_EQ(
copy->moveToFbString().toStdString(),
expected->clone()->moveToFbString().toStdString());
dataDelivered = true;
eventbase_->terminateLoopSoon();
}));
auto packet1 = packetToBuf(createStreamPacket(
*serverChosenConnId /* src */,
*originalConnId /* dest */,
appDataPacketNum++,
streamId,
*data,
0 /* cipherOverhead */,
0 /* largestAcked */,
folly::none /* longHeaderOverride */,
false /* eof */));
auto packet2 = packetToBuf(createStreamPacket(
*serverChosenConnId /* src */,
*originalConnId /* dest */,
appDataPacketNum++,
streamId,
*data,
0 /* cipherOverhead */,
0 /* largestAcked */,
folly::none /* longHeaderOverride */,
true /* eof */,
folly::none /* shortHeaderOverride */,
data->length() /* offset */));
socketReads.emplace_back(TestReadData(packet1->coalesce(), serverAddr));
deliverData(packet2->coalesce());
if (!dataDelivered) {
eventbase_->loopForever();
}
EXPECT_TRUE(dataDelivered);
client->close(folly::none);
}
TEST_F(QuicClientTransportAfterStartTest, ReadStreamWithRetriableError) {
StreamId streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, &readCb);
EXPECT_CALL(readCb, readAvailable(_)).Times(0);
EXPECT_CALL(readCb, readError(_, _)).Times(0);
deliverNetworkError(EAGAIN);
client->setReadCallback(streamId, nullptr);
client->close(folly::none);
}
TEST_F(QuicClientTransportAfterStartTest, ReadStreamWithNonRetriableError) {
StreamId streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, &readCb);
EXPECT_CALL(readCb, readAvailable(_)).Times(0);
// TODO: we currently do not close the socket, but maybe we can in the future.
EXPECT_CALL(readCb, readError(_, _)).Times(0);
deliverNetworkError(EBADF);
client->setReadCallback(streamId, nullptr);
client->close(folly::none);
}
TEST_F(
QuicClientTransportAfterStartTest,
ReadStreamMultiplePacketsWithRetriableError) {
StreamId streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, &readCb);
bool dataDelivered = false;
auto expected = IOBuf::copyBuffer("hello");
EXPECT_CALL(readCb, readAvailable(streamId)).WillOnce(Invoke([&](auto) {
auto readData = client->read(streamId, 1000);
auto copy = readData->first->clone();
LOG(INFO) << "Client received data=" << copy->moveToFbString().toStdString()
<< " on stream=" << streamId;
EXPECT_TRUE(folly::IOBufEqualTo()((*readData).first, expected));
dataDelivered = true;
eventbase_->terminateLoopSoon();
}));
auto packet = packetToBuf(createStreamPacket(
*serverChosenConnId /* src */,
*originalConnId /* dest */,
appDataPacketNum++,
streamId,
*expected,
0 /* cipherOverhead */,
0 /* largestAcked */));
socketReads.emplace_back(TestReadData(packet->coalesce(), serverAddr));
deliverNetworkError(EAGAIN);
if (!dataDelivered) {
eventbase_->loopForever();
}
EXPECT_TRUE(dataDelivered);
client->close(folly::none);
}
TEST_F(
QuicClientTransportAfterStartTest,
ReadStreamMultiplePacketsWithNonRetriableError) {
StreamId streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, &readCb);
auto expected = IOBuf::copyBuffer("hello");
EXPECT_CALL(readCb, readAvailable(streamId)).Times(0);
// TODO: we currently do not close the socket, but maybe we can in the future.
EXPECT_CALL(readCb, readError(_, _)).Times(0);
auto packet = packetToBuf(createStreamPacket(
*serverChosenConnId /* src */,
*originalConnId /* dest */,
appDataPacketNum++,
streamId,
*expected,
0 /* cipherOverhead */,
0 /* largestAcked */));
socketReads.emplace_back(TestReadData(packet->coalesce(), serverAddr));
deliverNetworkError(EBADF);
client->setReadCallback(streamId, nullptr);
}
TEST_F(QuicClientTransportAfterStartTest, RecvNewConnectionIdValid) {
auto& conn = client->getNonConstConn();
conn.transportSettings.selfActiveConnectionIdLimit = 1;

View File

@@ -97,7 +97,7 @@ void QuicServerTransport::setCongestionControllerFactory(
void QuicServerTransport::onReadData(
const folly::SocketAddress& peer,
NetworkData&& networkData) {
NetworkDataSingle&& networkData) {
ServerEvents::ReadData readData;
readData.peer = peer;
readData.networkData = std::move(networkData);
@@ -341,7 +341,10 @@ void QuicServerTransport::processPendingData(bool async) {
auto serverPtr = static_cast<QuicServerTransport*>(self.get());
for (auto& pendingPacket : *pendingData) {
serverPtr->onNetworkData(
pendingPacket.peer, std::move(pendingPacket.networkData));
pendingPacket.peer,
NetworkData(
std::move(pendingPacket.networkData.data),
pendingPacket.networkData.receiveTimePoint));
if (serverPtr->closeState_ == CloseState::CLOSED) {
// The pending data could potentially contain a connection close, or
// the app could have triggered a connection close with an error. It

View File

@@ -91,8 +91,9 @@ class QuicServerTransport
virtual void setClientConnectionId(const ConnectionId& clientConnectionId);
// From QuicTransportBase
void onReadData(const folly::SocketAddress& peer, NetworkData&& networkData)
override;
void onReadData(
const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) override;
void writeData() override;
void closeTransport() override;
void unbindConnection() override;

View File

@@ -298,8 +298,9 @@ void QuicServerWorker::forwardNetworkData(
if (packetForwardingEnabled_ && !isForwardedData) {
VLOG(3) << "Forwarding packet with unknown connId version from client="
<< client << " to another process";
auto recvTime = networkData.receiveTimePoint;
takeoverPktHandler_.forwardPacketToAnotherServer(
client, std::move(networkData.data), networkData.receiveTimePoint);
client, std::move(networkData).moveAllData(), recvTime);
QUIC_STATS(infoCallback_, onPacketForwarded);
return;
} else {
@@ -370,8 +371,7 @@ void QuicServerWorker::dispatchPacketData(
// This could be a new connection, add it in the map
// verify that the initial packet is at least min initial bytes
// to avoid amplification attacks.
if (networkData.data->computeChainDataLength() <
kMinInitialPacketSize) {
if (networkData.totalData < kMinInitialPacketSize) {
// Don't even attempt to forward the packet, just drop it.
VLOG(3) << "Dropping small initial packet from client=" << client;
QUIC_STATS(
@@ -476,8 +476,9 @@ void QuicServerWorker::dispatchPacketData(
VLOG(4) << "Forwarding packet from client=" << client
<< " to another process, workerId=" << (uint32_t)workerId_
<< ", processId_=" << (uint32_t) static_cast<uint8_t>(processId_);
auto recvTime = networkData.receiveTimePoint;
takeoverPktHandler_.forwardPacketToAnotherServer(
client, std::move(networkData.data), networkData.receiveTimePoint);
client, std::move(networkData).moveAllData(), recvTime);
QUIC_STATS(infoCallback_, onPacketForwarded);
}
@@ -490,7 +491,7 @@ void QuicServerWorker::sendResetPacket(
// Only send resets in response to short header packets.
return;
}
uint16_t packetSize = networkData.data->computeChainDataLength();
uint16_t packetSize = networkData.totalData;
uint16_t maxResetPacketSize = std::min<uint16_t>(
std::max<uint16_t>(kMinStatelessPacketSize, packetSize),
kDefaultUDPSendPacketLen);

View File

@@ -481,7 +481,7 @@ void handleCipherUnavailable(
}
ServerEvents::ReadData pendingReadData;
pendingReadData.peer = readData.peer;
pendingReadData.networkData = NetworkData(
pendingReadData.networkData = NetworkDataSingle(
std::move(originalData->packet), readData.networkData.receiveTimePoint);
pendingData->emplace_back(std::move(pendingReadData));
VLOG(10) << "Adding pending data to "

View File

@@ -43,7 +43,7 @@ enum ServerState {
struct ServerEvents {
struct ReadData {
folly::SocketAddress peer;
NetworkData networkData;
NetworkDataSingle networkData;
};
struct Close {};

View File

@@ -32,9 +32,15 @@ using PacketDropReason = QuicTransportStatsCallback::PacketDropReason;
} // namespace
namespace test {
MATCHER_P(BufMatches, buf, "") {
MATCHER_P(NetworkDataMatches, networkData, "") {
for (size_t i = 0; i < arg.packets.size(); ++i) {
folly::IOBufEqualTo eq;
return eq(*arg, buf);
bool equals = eq(*arg.packets[i], networkData);
if (equals) {
return true;
}
}
return false;
}
class TestingEventBaseObserver : public folly::EventBaseObserver {
@@ -215,7 +221,7 @@ void QuicServerWorkerTest::createQuicConnection(
transport = transportOverride;
}
expectConnectionCreation(addr, connId, transport);
EXPECT_CALL(*transport, onNetworkData(addr, BufMatches(*data)));
EXPECT_CALL(*transport, onNetworkData(addr, NetworkDataMatches(*data)));
worker_->dispatchPacketData(
addr, std::move(routingData), NetworkData(data->clone(), Clock::now()));
@@ -364,7 +370,8 @@ TEST_F(QuicServerWorkerTest, QuicServerMultipleConnIdsRouting) {
EXPECT_EQ(addrMap.count(std::make_pair(kClientAddr, connId)), 0);
// routing by connid after connid available.
EXPECT_CALL(*transport_, onNetworkData(kClientAddr, BufMatches(*data)))
EXPECT_CALL(
*transport_, onNetworkData(kClientAddr, NetworkDataMatches(*data)))
.Times(1);
RoutingData routingData2(
HeaderForm::Short, false, false, connId, folly::none);
@@ -380,7 +387,8 @@ TEST_F(QuicServerWorkerTest, QuicServerMultipleConnIdsRouting) {
EXPECT_EQ(connIdMap.size(), 2);
EXPECT_CALL(*transport_, onNetworkData(kClientAddr, BufMatches(*data)))
EXPECT_CALL(
*transport_, onNetworkData(kClientAddr, NetworkDataMatches(*data)))
.Times(1);
RoutingData routingData3(
HeaderForm::Short, false, false, connId2, folly::none);
@@ -449,7 +457,8 @@ TEST_F(QuicServerWorkerTest, QuicServerNewConnection) {
0);
// routing by connid after connid available.
EXPECT_CALL(*transport_, onNetworkData(kClientAddr, BufMatches(*data)));
EXPECT_CALL(
*transport_, onNetworkData(kClientAddr, NetworkDataMatches(*data)));
RoutingData routingData2(
HeaderForm::Short,
false,
@@ -1012,7 +1021,8 @@ TEST_F(QuicServerWorkerTakeoverTest, QuicServerTakeoverProcessForwardedPkt) {
EXPECT_EQ(addr.getPort(), clientAddr.getPort());
// the original data should be extracted after processing takeover
// protocol related information
EXPECT_TRUE(eq(*data, *(networkData->data)));
EXPECT_EQ(networkData->packets.size(), 1);
EXPECT_TRUE(eq(*data, *(networkData->packets[0])));
EXPECT_TRUE(isForwardedData);
};
EXPECT_CALL(*takeoverWorkerCb_, routeDataToWorkerLong(_, _, _, _))
@@ -1182,8 +1192,11 @@ class QuicServerTest : public Test {
transportSettings.advertisedInitialConnectionWindowSize);
}));
ON_CALL(*transport, onNetworkData(_, _))
.WillByDefault(Invoke([&, expected = data.get()](auto, auto buf) {
EXPECT_TRUE(folly::IOBufEqualTo()(*buf, *expected));
.WillByDefault(
Invoke([&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(
folly::IOBufEqualTo()(*networkData.packets[0], *expected));
std::unique_lock<std::mutex> lg(m);
calledOnNetworkData = true;
cv.notify_one();
@@ -1361,8 +1374,11 @@ class QuicServerTakeoverTest : public Test {
EXPECT_EQ(params.workerId, 0);
}));
EXPECT_CALL(*transport, onNetworkData(_, _))
.WillOnce(Invoke([&, expected = data.get()](auto, auto buf) {
EXPECT_TRUE(folly::IOBufEqualTo()(*buf, *expected));
.WillOnce(
Invoke([&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(
folly::IOBufEqualTo()(*networkData.packets[0], *expected));
baton.post();
}));
return transport;
@@ -1465,8 +1481,11 @@ class QuicServerTakeoverTest : public Test {
// onNetworkData(_, _) shouldn't be called on the newServer_ transport,
// but should be routed to oldServer_
EXPECT_CALL(*transportCbForOldServer, onNetworkData(_, _))
.WillOnce(Invoke([&, expected = data.get()](auto, auto buf) {
EXPECT_TRUE(folly::IOBufEqualTo()(*buf, *expected));
.WillOnce(
Invoke([&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(
folly::IOBufEqualTo()(*networkData.packets[0], *expected));
b1.post();
}));
// new quic server receives the packet and forwards it
@@ -1885,8 +1904,11 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) {
EXPECT_CALL(*transport, accept());
// post baton upon receiving the data
EXPECT_CALL(*transport, onNetworkData(_, _))
.WillOnce(Invoke([&, expected = data.get()](auto, auto buf) {
EXPECT_TRUE(folly::IOBufEqualTo()(*buf, *expected));
.WillOnce(
Invoke([&, expected = data.get()](auto, const auto& networkData) {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_TRUE(
folly::IOBufEqualTo()(*networkData.packets[0], *expected));
b.post();
}));
return transport;
@@ -1926,9 +1948,11 @@ TEST_F(QuicServerTest, ZeroRttPacketRoute) {
data = std::move(packet);
folly::Baton<> b1;
auto verifyZeroRtt = [&](
const folly::SocketAddress& peer, const folly::IOBuf* rcvdPkt) noexcept {
const folly::SocketAddress& peer,
const NetworkData& networkData) noexcept {
EXPECT_GT(networkData.packets.size(), 0);
EXPECT_EQ(peer, reader->getSocket().address());
EXPECT_TRUE(folly::IOBufEqualTo()(*rcvdPkt, *data));
EXPECT_TRUE(folly::IOBufEqualTo()(*data, *networkData.packets[0]));
b1.post();
};
EXPECT_CALL(*transport, onNetworkData(_, _)).WillOnce(Invoke(verifyZeroRtt));

View File

@@ -34,12 +34,47 @@
namespace quic {
struct NetworkData {
Buf data;
TimePoint receiveTimePoint;
std::vector<std::unique_ptr<folly::IOBuf>> packets;
size_t totalData{0};
NetworkData() = default;
NetworkData(Buf&& buf, const TimePoint& receiveTime)
: data(std::move(buf)), receiveTimePoint(receiveTime) {}
: receiveTimePoint(receiveTime) {
if (buf) {
totalData = buf->computeChainDataLength();
packets.emplace_back(std::move(buf));
}
}
std::unique_ptr<folly::IOBuf> moveAllData() && {
std::unique_ptr<folly::IOBuf> buf;
for (size_t i = 0; i < packets.size(); ++i) {
if (buf) {
buf->prependChain(std::move(packets[i]));
} else {
buf = std::move(packets[i]);
}
}
return buf;
}
};
struct NetworkDataSingle {
std::unique_ptr<folly::IOBuf> data;
TimePoint receiveTimePoint;
size_t totalData{0};
NetworkDataSingle() = default;
NetworkDataSingle(
std::unique_ptr<folly::IOBuf> buf,
const TimePoint& receiveTime)
: data(std::move(buf)), receiveTimePoint(receiveTime) {
if (data) {
totalData += data->computeChainDataLength();
}
}
};
/**

View File

@@ -115,6 +115,10 @@ struct TransportSettings {
// The active_connection_id_limit that is sent to the peer.
uint64_t selfActiveConnectionIdLimit{0};
// Maximum size of the batch that should be used when receiving packets from
// the kernel in one event loop.
size_t maxRecvBatchSize{5};
};
} // namespace quic