1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Add ReadDatagram struct and update readDatagrams callback

Summary: In order to provide more information about datagrams when we receive them, I'm adding a wrapper around the BufQueue that we currently hold so we can include receiveTimePoint and any other metadata we might want to expose for the Datagram API.

Reviewed By: mjoras

Differential Revision: D33994358

fbshipit-source-id: 805f182cd350908320639bc07eb6f3b7349bbc05
This commit is contained in:
Elijah Staple
2022-02-10 16:44:04 -08:00
committed by Facebook GitHub Bot
parent 7e4c34bb42
commit b57802721e
11 changed files with 105 additions and 14 deletions

View File

@@ -1248,7 +1248,14 @@ class QuicSocket {
* Returns the currently available received Datagrams.
* Returns all datagrams if atMost is 0.
*/
virtual folly::Expected<std::vector<Buf>, LocalErrorCode> readDatagrams(
virtual folly::Expected<std::vector<ReadDatagram>, LocalErrorCode>
readDatagrams(size_t atMost = 0) = 0;
/**
* Returns the currently available received Datagram IOBufs.
* Returns all datagrams if atMost is 0.
*/
virtual folly::Expected<std::vector<Buf>, LocalErrorCode> readDatagramBufs(
size_t atMost = 0) = 0;
};
} // namespace quic

View File

@@ -2922,8 +2922,31 @@ folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::writeDatagram(
return folly::unit;
}
folly::Expected<std::vector<Buf>, LocalErrorCode>
folly::Expected<std::vector<ReadDatagram>, LocalErrorCode>
QuicTransportBase::readDatagrams(size_t atMost) {
CHECK(conn_);
auto datagrams = &conn_->datagramState.readBuffer;
if (closeState_ != CloseState::OPEN) {
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
}
if (atMost == 0) {
atMost = datagrams->size();
} else {
atMost = std::min(atMost, datagrams->size());
}
std::vector<ReadDatagram> retDatagrams;
retDatagrams.reserve(atMost);
std::transform(
datagrams->begin(),
datagrams->begin() + atMost,
std::back_inserter(retDatagrams),
[](ReadDatagram& dg) { return std::move(dg); });
datagrams->erase(datagrams->begin(), datagrams->begin() + atMost);
return retDatagrams;
}
folly::Expected<std::vector<Buf>, LocalErrorCode>
QuicTransportBase::readDatagramBufs(size_t atMost) {
CHECK(conn_);
auto datagrams = &conn_->datagramState.readBuffer;
if (closeState_ != CloseState::OPEN) {
@@ -2940,7 +2963,7 @@ QuicTransportBase::readDatagrams(size_t atMost) {
datagrams->begin(),
datagrams->begin() + atMost,
std::back_inserter(retDatagrams),
[](BufQueue& bq) { return bq.move(); });
[](ReadDatagram& dg) { return dg.bufQueue().move(); });
datagrams->erase(datagrams->begin(), datagrams->begin() + atMost);
return retDatagrams;
}

View File

@@ -678,7 +678,14 @@ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver {
* Returns the currently available received Datagrams.
* Returns all datagrams if atMost is 0.
*/
folly::Expected<std::vector<Buf>, LocalErrorCode> readDatagrams(
folly::Expected<std::vector<ReadDatagram>, LocalErrorCode> readDatagrams(
size_t atMost = 0) override;
/**
* Returns the currently available received Datagram IOBufs.
* Returns all datagrams if atMost is 0.
*/
folly::Expected<std::vector<Buf>, LocalErrorCode> readDatagramBufs(
size_t atMost = 0) override;
/**

View File

@@ -308,6 +308,9 @@ class MockQuicSocket : public QuicSocket {
MOCK_METHOD1(writeDatagram, WriteResult(SharedBuf));
MOCK_METHOD1(
readDatagrams,
folly::Expected<std::vector<ReadDatagram>, LocalErrorCode>(size_t));
MOCK_METHOD1(
readDatagramBufs,
folly::Expected<std::vector<Buf>, LocalErrorCode>(size_t));
};
} // namespace quic

View File

@@ -255,7 +255,7 @@ class TestQuicTransport
} else if (type == TestFrameType::DATAGRAM) {
auto buffer = decodeDatagramFrame(cursor);
auto frame = DatagramFrame(buffer.second, std::move(buffer.first));
handleDatagram(*conn_, frame);
handleDatagram(*conn_, frame, data.receiveTimePoint);
} else {
auto buffer = decodeStreamBuffer(cursor);
QuicStreamState* stream = conn_->streamManager->getStream(buffer.first);
@@ -372,10 +372,10 @@ class TestQuicTransport
updateReadLooper();
}
void addDatagram(Buf data) {
void addDatagram(Buf data, TimePoint recvTime = Clock::now()) {
auto buf = encodeDatagramFrame(std::move(data));
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), Clock::now()));
onNetworkData(addr, NetworkData(std::move(buf), recvTime));
}
void closeStream(StreamId id) {
@@ -3866,13 +3866,29 @@ TEST_F(QuicTransportImplTest, ZeroLengthDatagram) {
transport->addDatagram(folly::IOBuf::copyBuffer(""));
EXPECT_CALL(datagramCb, onDatagramsAvailable());
transport->driveReadCallbacks();
auto datagrams = transport->readDatagrams();
auto datagrams = transport->readDatagramBufs();
EXPECT_FALSE(datagrams.hasError());
EXPECT_EQ(datagrams->size(), 1);
EXPECT_TRUE(datagrams->front() != nullptr);
EXPECT_EQ(datagrams->front()->computeChainDataLength(), 0);
}
TEST_F(QuicTransportImplTest, ZeroLengthDatagramBufs) {
NiceMock<MockDatagramCallback> datagramCb;
transport->enableDatagram();
transport->setDatagramCallback(&datagramCb);
auto recvTime = Clock::now() + 5000ns;
transport->addDatagram(folly::IOBuf::copyBuffer(""), recvTime);
EXPECT_CALL(datagramCb, onDatagramsAvailable());
transport->driveReadCallbacks();
auto datagrams = transport->readDatagrams();
EXPECT_FALSE(datagrams.hasError());
EXPECT_EQ(datagrams->size(), 1);
EXPECT_TRUE(datagrams->front().bufQueue().front() != nullptr);
EXPECT_EQ(datagrams->front().receiveTimePoint(), recvTime);
EXPECT_EQ(datagrams->front().bufQueue().front()->computeChainDataLength(), 0);
}
TEST_F(QuicTransportImplTest, Cmsgs) {
transport->setServerConnectionId();
folly::SocketOptionMap cmsgs;

View File

@@ -592,7 +592,7 @@ void QuicClientTransport::processPacketData(
// Datagram isn't retransmittable. But we would like to ack them early.
// So, make Datagram frames count towards ack policy
pktHasRetransmittableData = true;
handleDatagram(*conn_, frame);
handleDatagram(*conn_, frame, receiveTimePoint);
break;
}
default:

View File

@@ -4711,6 +4711,7 @@ TEST_F(
auto payload = client->getConn()
.datagramState.readBuffer[0]
.bufQueue()
.front()
->clone()
->moveToFbString();

View File

@@ -1167,7 +1167,7 @@ void onServerReadDataFromOpen(
// Datagram isn't retransmittable. But we would like to ack them
// early. So, make Datagram frames count towards ack policy
pktHasRetransmittableData = true;
handleDatagram(conn, frame);
handleDatagram(conn, frame, readData.networkData.receiveTimePoint);
break;
}
default: {

View File

@@ -9,7 +9,10 @@
namespace quic {
void handleDatagram(QuicConnectionStateBase& conn, DatagramFrame& frame) {
void handleDatagram(
QuicConnectionStateBase& conn,
DatagramFrame& frame,
TimePoint recvTimePoint) {
// TODO(lniccolini) update max datagram frame size
// https://github.com/quicwg/datagram/issues/3
// For now, max_datagram_size > 0 means the peer supports datagram frames
@@ -29,7 +32,8 @@ void handleDatagram(QuicConnectionStateBase& conn, DatagramFrame& frame) {
}
}
QUIC_STATS(conn.statsCallback, onDatagramRead, frame.data.chainLength());
conn.datagramState.readBuffer.emplace_back(std::move(frame.data));
conn.datagramState.readBuffer.emplace_back(
recvTimePoint, std::move(frame.data));
}
} // namespace quic

View File

@@ -15,6 +15,9 @@ namespace quic {
/**
* Processes a Datagram frame
*/
void handleDatagram(QuicConnectionStateBase& conn, DatagramFrame& frame);
void handleDatagram(
QuicConnectionStateBase& conn,
DatagramFrame& frame,
TimePoint recvTimePoint);
} // namespace quic

View File

@@ -390,6 +390,33 @@ class CongestionControllerFactory;
class LoopDetectorCallback;
class PendingPathRateLimiter;
struct ReadDatagram {
ReadDatagram(TimePoint recvTimePoint, BufQueue data)
: receiveTimePoint_{recvTimePoint}, buf_{std::move(data)} {}
[[nodiscard]] TimePoint receiveTimePoint() const noexcept {
return receiveTimePoint_;
}
[[nodiscard]] BufQueue& bufQueue() noexcept {
return buf_;
}
[[nodiscard]] const BufQueue& bufQueue() const noexcept {
return buf_;
}
// Move only to match BufQueue behavior
ReadDatagram(ReadDatagram&& other) noexcept = default;
ReadDatagram& operator=(ReadDatagram&& other) = default;
ReadDatagram(const ReadDatagram&) = delete;
ReadDatagram& operator=(const ReadDatagram&) = delete;
private:
TimePoint receiveTimePoint_;
BufQueue buf_;
};
struct QuicConnectionStateBase : public folly::DelayedDestruction {
virtual ~QuicConnectionStateBase() override = default;
@@ -798,7 +825,7 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
uint32_t maxReadBufferSize{kDefaultMaxDatagramsBuffered};
uint32_t maxWriteBufferSize{kDefaultMaxDatagramsBuffered};
// Buffers Incoming Datagrams
std::deque<BufQueue> readBuffer;
std::deque<ReadDatagram> readBuffer;
// Buffers Outgoing Datagrams
std::deque<BufQueue> writeBuffer;
};