mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-11-10 21:22:20 +03:00
use recvmsg on client
Summary: Use the new recvmsg api on the client to receive a packet from AsyncUDPSocket Reviewed By: mjoras Differential Revision: D18797963 fbshipit-source-id: 319d5c41f3a868e7b78947fdbcf2c411b6d7fbf0
This commit is contained in:
committed by
Facebook Github Bot
parent
edca9e794f
commit
02d473e8ec
@@ -1015,37 +1015,62 @@ void QuicClientTransport::errMessage(
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void QuicClientTransport::getReadBuffer(void** buf, size_t* len) noexcept {
|
void QuicClientTransport::getReadBuffer(void**, size_t*) noexcept {}
|
||||||
DCHECK(conn_) << "trying to receive packets without a connection";
|
|
||||||
auto readBufferSize = conn_->transportSettings.maxRecvPacketSize;
|
|
||||||
readBuffer_ = folly::IOBuf::create(readBufferSize);
|
|
||||||
*buf = readBuffer_->writableData();
|
|
||||||
*len = readBufferSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
void QuicClientTransport::onDataAvailable(
|
void QuicClientTransport::onDataAvailable(
|
||||||
const folly::SocketAddress& server,
|
const folly::SocketAddress&,
|
||||||
size_t len,
|
size_t,
|
||||||
bool truncated) noexcept {
|
bool) noexcept {}
|
||||||
VLOG(10) << "Got data from socket peer=" << server << " len=" << len;
|
|
||||||
|
bool QuicClientTransport::shouldOnlyNotify() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void QuicClientTransport::onNotifyDataAvailable() noexcept {
|
||||||
|
DCHECK(conn_) << "trying to receive packets without a connection";
|
||||||
|
auto readBufferSize = conn_->transportSettings.maxRecvPacketSize;
|
||||||
|
auto readBuffer = folly::IOBuf::create(readBufferSize);
|
||||||
|
|
||||||
|
struct iovec vec;
|
||||||
|
vec.iov_base = readBuffer->writableData();
|
||||||
|
vec.iov_len = readBufferSize;
|
||||||
|
|
||||||
|
struct sockaddr_storage addrStorage;
|
||||||
|
socklen_t addrLen = sizeof(addrStorage);
|
||||||
|
memset(&addrStorage, 0, size_t(addrLen));
|
||||||
|
auto rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
|
||||||
|
rawAddr->sa_family = socket_->address().getFamily();
|
||||||
|
|
||||||
|
struct msghdr msg;
|
||||||
|
memset(&msg, 0, sizeof(msg));
|
||||||
|
msg.msg_name = rawAddr;
|
||||||
|
msg.msg_namelen = addrLen;
|
||||||
|
msg.msg_iov = &vec;
|
||||||
|
msg.msg_iovlen = 1;
|
||||||
|
|
||||||
|
ssize_t ret = socket_->recvmsg(&msg, 0);
|
||||||
|
if (ret < 0) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return onReadError(folly::AsyncSocketException(
|
||||||
|
folly::AsyncSocketException::INTERNAL_ERROR,
|
||||||
|
"::recvmsg() failed",
|
||||||
|
errno));
|
||||||
|
}
|
||||||
|
size_t bytesRead = size_t(ret);
|
||||||
|
folly::SocketAddress server;
|
||||||
|
server.setFromSockaddr(rawAddr, addrLen);
|
||||||
|
VLOG(10) << "Got data from socket peer=" << server << " len=" << bytesRead;
|
||||||
// TODO: we can get better receive time accuracy than this, with
|
// TODO: we can get better receive time accuracy than this, with
|
||||||
// SO_TIMESTAMP or SIOCGSTAMP.
|
// SO_TIMESTAMP or SIOCGSTAMP.
|
||||||
auto packetReceiveTime = Clock::now();
|
auto packetReceiveTime = Clock::now();
|
||||||
Buf data = std::move(readBuffer_);
|
readBuffer->append(bytesRead);
|
||||||
if (truncated) {
|
QUIC_TRACE(udp_recvd, *conn_, bytesRead);
|
||||||
// This is an error, drop the packet.
|
|
||||||
if (conn_->qLogger) {
|
|
||||||
conn_->qLogger->addPacketDrop(len, kUdpTruncated);
|
|
||||||
}
|
|
||||||
QUIC_TRACE(packet_drop, *conn_, "udp_truncated");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
data->append(len);
|
|
||||||
QUIC_TRACE(udp_recvd, *conn_, (uint64_t)len);
|
|
||||||
if (conn_->qLogger) {
|
if (conn_->qLogger) {
|
||||||
conn_->qLogger->addDatagramReceived(len);
|
conn_->qLogger->addDatagramReceived(bytesRead);
|
||||||
}
|
}
|
||||||
NetworkData networkData(std::move(data), packetReceiveTime);
|
NetworkData networkData(std::move(readBuffer), packetReceiveTime);
|
||||||
onNetworkData(server, std::move(networkData));
|
onNetworkData(server, std::move(networkData));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -142,13 +142,14 @@ class QuicClientTransport
|
|||||||
};
|
};
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void getReadBuffer(void** buf, size_t* len) noexcept override;
|
|
||||||
|
|
||||||
// From AsyncUDPSocket::ReadCallback
|
// From AsyncUDPSocket::ReadCallback
|
||||||
|
void getReadBuffer(void** buf, size_t* len) noexcept override;
|
||||||
void onDataAvailable(
|
void onDataAvailable(
|
||||||
const folly::SocketAddress& server,
|
const folly::SocketAddress& server,
|
||||||
size_t len,
|
size_t len,
|
||||||
bool truncated) noexcept override;
|
bool truncated) noexcept override;
|
||||||
|
bool shouldOnlyNotify() override;
|
||||||
|
void onNotifyDataAvailable() noexcept override;
|
||||||
|
|
||||||
void processUDPData(
|
void processUDPData(
|
||||||
const folly::SocketAddress& peer,
|
const folly::SocketAddress& peer,
|
||||||
@@ -172,7 +173,6 @@ class QuicClientTransport
|
|||||||
void onNewCachedPsk(
|
void onNewCachedPsk(
|
||||||
fizz::client::NewCachedPsk& newCachedPsk) noexcept override;
|
fizz::client::NewCachedPsk& newCachedPsk) noexcept override;
|
||||||
|
|
||||||
Buf readBuffer_;
|
|
||||||
folly::Optional<std::string> hostname_;
|
folly::Optional<std::string> hostname_;
|
||||||
HappyEyeballsConnAttemptDelayTimeout happyEyeballsConnAttemptDelayTimeout_;
|
HappyEyeballsConnAttemptDelayTimeout happyEyeballsConnAttemptDelayTimeout_;
|
||||||
bool serverInitialParamsSet_{false};
|
bool serverInitialParamsSet_{false};
|
||||||
|
|||||||
@@ -1483,12 +1483,18 @@ class QuicClientTransportTest : public Test {
|
|||||||
folly::ByteRange data,
|
folly::ByteRange data,
|
||||||
bool writes = true) {
|
bool writes = true) {
|
||||||
ASSERT_TRUE(networkReadCallback);
|
ASSERT_TRUE(networkReadCallback);
|
||||||
uint8_t* buf = nullptr;
|
EXPECT_CALL(*sock, recvmsg(_, _))
|
||||||
size_t len = 0;
|
.WillOnce(Invoke([&](struct msghdr* msg, int) {
|
||||||
networkReadCallback->getReadBuffer((void**)&buf, &len);
|
DCHECK_GT(msg->msg_iovlen, 0);
|
||||||
ASSERT_GT(len, data.size());
|
memcpy(msg->msg_iov[0].iov_base, data.data(), data.size());
|
||||||
memcpy(buf, data.data(), data.size());
|
if (msg->msg_name) {
|
||||||
networkReadCallback->onDataAvailable(addr, data.size(), false);
|
socklen_t msg_len =
|
||||||
|
addr.getAddress(static_cast<sockaddr_storage*>(msg->msg_name));
|
||||||
|
msg->msg_namelen = msg_len;
|
||||||
|
}
|
||||||
|
return data.size();
|
||||||
|
}));
|
||||||
|
networkReadCallback->onNotifyDataAvailable();
|
||||||
if (writes) {
|
if (writes) {
|
||||||
loopForWrites();
|
loopForWrites();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user