diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 5c8bc3d68..8a945b767 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -1889,7 +1889,7 @@ void QuicTransportBase::onNetworkData( builder.addReceivedPacket( SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket:: Builder() - .setPacketReceiveTime(networkData.getReceiveTimePoint()) + .setPacketReceiveTime(packet.timings.receiveTimePoint) .setPacketNumBytes(packet.buf->computeChainDataLength()) .build()); } @@ -1903,13 +1903,9 @@ void QuicTransportBase::onNetworkData( }); } - const auto receiveTimePoint = networkData.getReceiveTimePoint(); auto packets = std::move(networkData).movePackets(); for (auto& packet : packets) { - onReadData( - peer, - NetworkDataSingle( - ReceivedPacket(std::move(packet.buf)), receiveTimePoint)); + onReadData(peer, NetworkDataSingle(std::move(packet))); if (conn_->peerConnectionError) { closeImpl(QuicError( QuicErrorCode(TransportErrorCode::NO_ERROR), "Peer closed")); diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index 819a6bd70..23c37b045 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -290,7 +290,7 @@ class TestQuicTransport } else if (type == TestFrameType::DATAGRAM) { auto buffer = decodeDatagramFrame(cursor); auto frame = DatagramFrame(buffer.second, std::move(buffer.first)); - handleDatagram(*conn_, frame, data.receiveTimePoint); + handleDatagram(*conn_, frame, data.packet.timings.receiveTimePoint); } else if (type == TestFrameType::STREAM_GROUP) { auto res = decodeStreamGroupBuffer(cursor); QuicStreamState* stream = diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index e4fbe4257..85f576d03 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -143,7 +143,8 @@ void QuicClientTransport::processUDPData( for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { - processPacketData(peer, networkData.receiveTimePoint, udpData); + processPacketData( + peer, networkData.packet.timings.receiveTimePoint, udpData); } VLOG_IF(4, !udpData.empty()) << "Leaving " << udpData.chainLength() @@ -158,7 +159,7 @@ void QuicClientTransport::processUDPData( pendingPacket.append(std::move(pendingData.networkData.packet.buf)); processPacketData( pendingData.peer, - pendingData.networkData.receiveTimePoint, + pendingData.networkData.packet.timings.receiveTimePoint, pendingPacket); pendingPacket.move(); } @@ -171,7 +172,7 @@ void QuicClientTransport::processUDPData( pendingPacket.append(std::move(pendingData.networkData.packet.buf)); processPacketData( pendingData.peer, - pendingData.networkData.receiveTimePoint, + pendingData.networkData.packet.timings.receiveTimePoint, pendingPacket); pendingPacket.move(); } diff --git a/quic/common/NetworkData.h b/quic/common/NetworkData.h index c226db5f0..e85fff233 100644 --- a/quic/common/NetworkData.h +++ b/quic/common/NetworkData.h @@ -16,20 +16,36 @@ namespace quic { struct ReceivedPacket { + struct Timings { + // Legacy Receive TimePoint. + // + // This TimePoint is being deprecated in favor of having TimePoints that are + // named specifically for the timing event being tracked. + // + // The meaning of this TimePoint varies by transport implementation and + // settings: it can be the time when recv() started or completed, the socket + // RX timestamp for this packet or another packet received during the same + // call to recv*(), or something else entirely. + // + // TODO(bschlinker): Complete deprecation + TimePoint receiveTimePoint; + }; + ReceivedPacket() = default; explicit ReceivedPacket(Buf&& bufIn) : buf(std::move(bufIn)) {} - // data Buf buf; + Timings timings; }; struct NetworkData { NetworkData() = default; - NetworkData(Buf&& buf, const TimePoint& receiveTime) - : receiveTimePoint_(receiveTime) { + NetworkData(Buf&& buf, const TimePoint& receiveTimePointIn) + : receiveTimePoint_(receiveTimePointIn) { if (buf) { totalData_ = buf->computeChainDataLength(); packets_.emplace_back(std::move(buf)); + packets_.back().timings.receiveTimePoint = receiveTimePointIn; } } @@ -37,12 +53,15 @@ struct NetworkData { std::vector&& packetBufs, const TimePoint& receiveTimePointIn) : receiveTimePoint_(receiveTimePointIn), - packets_([&packetBufs]() { + packets_([&packetBufs, &receiveTimePointIn]() { std::vector result; result.reserve(packetBufs.size()); for (auto& packetBuf : packetBufs) { result.emplace_back(std::move(packetBuf)); } + for (auto& packet : result) { + packet.timings.receiveTimePoint = receiveTimePointIn; + } return result; }()), totalData_([this]() { @@ -59,6 +78,7 @@ struct NetworkData { void addPacket(ReceivedPacket&& packetIn) { packets_.emplace_back(std::move(packetIn)); + packets_.back().timings.receiveTimePoint = receiveTimePoint_; } [[nodiscard]] const std::vector& getPackets() const { @@ -71,6 +91,9 @@ struct NetworkData { void setReceiveTimePoint(const TimePoint& receiveTimePointIn) { receiveTimePoint_ = receiveTimePointIn; + for (auto& packet : packets_) { + packet.timings.receiveTimePoint = receiveTimePointIn; + } } [[nodiscard]] TimePoint getReceiveTimePoint() const { @@ -105,15 +128,23 @@ struct NetworkData { struct NetworkDataSingle { ReceivedPacket packet; - TimePoint receiveTimePoint; size_t totalData{0}; NetworkDataSingle() = default; + explicit NetworkDataSingle(ReceivedPacket&& packetIn) + : packet(std::move(packetIn)) { + if (packet.buf) { + totalData += packet.buf->computeChainDataLength(); + } + } + + // TODO(bschlinker): Deprecate NetworkDataSingle( ReceivedPacket&& packetIn, const TimePoint& receiveTimePointIn) - : packet(std::move(packetIn)), receiveTimePoint(receiveTimePointIn) { + : packet(std::move(packetIn)) { + packet.timings.receiveTimePoint = receiveTimePointIn; if (packet.buf) { totalData += packet.buf->computeChainDataLength(); } diff --git a/quic/server/QuicServerPacketRouter.cpp b/quic/server/QuicServerPacketRouter.cpp index be71095b2..7823fd733 100644 --- a/quic/server/QuicServerPacketRouter.cpp +++ b/quic/server/QuicServerPacketRouter.cpp @@ -129,9 +129,11 @@ void TakeoverPacketHandler::setDestination( void TakeoverPacketHandler::forwardPacketToAnotherServer( const folly::SocketAddress& peerAddress, - Buf data, - const TimePoint& packetReceiveTime) { - // create buffer for the peerAddress address and clientPacketReceiveTime + NetworkData&& networkData) { + const TimePoint receiveTimePoint = networkData.getReceiveTimePoint(); + Buf buf = std::move(networkData).moveAllData(); + + // create buffer for the peerAddress address and receiveTimePoint // Serialize: version (4B), socket(2 + 16)B and time of ack (8B) auto bufSize = sizeof(TakeoverProtocolVersion) + sizeof(uint16_t) + peerAddress.getActualSize() + sizeof(uint64_t); @@ -142,9 +144,9 @@ void TakeoverPacketHandler::forwardPacketToAnotherServer( uint16_t socklen = peerAddress.getAddress(&addrStorage); appender.writeBE(socklen); appender.push((uint8_t*)&addrStorage, socklen); - uint64_t tick = packetReceiveTime.time_since_epoch().count(); + uint64_t tick = receiveTimePoint.time_since_epoch().count(); appender.writeBE(tick); - writeBuffer->prependChain(std::move(data)); + writeBuffer->prependChain(std::move(buf)); forwardPacket(std::move(writeBuffer)); } diff --git a/quic/server/QuicServerPacketRouter.h b/quic/server/QuicServerPacketRouter.h index 6d938ca74..c2feadb95 100644 --- a/quic/server/QuicServerPacketRouter.h +++ b/quic/server/QuicServerPacketRouter.h @@ -80,8 +80,7 @@ class TakeoverPacketHandler { void forwardPacketToAnotherServer( const folly::SocketAddress& peerAddress, - Buf data, - const TimePoint& packetReceiveTime); + NetworkData&& networkData); void processForwardedPacket(const folly::SocketAddress& client, Buf data); diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 5571b95d5..29597e09c 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -513,7 +513,7 @@ void QuicServerTransport::processPendingData(bool async) { pendingPacket.peer, NetworkData( std::move(pendingPacket.networkData.packet.buf), - pendingPacket.networkData.receiveTimePoint)); + pendingPacket.networkData.packet.timings.receiveTimePoint)); if (serverPtr->closeState_ == CloseState::CLOSED) { // The pending data could potentially contain a connection close, or // the app could have triggered a connection close with an error. It diff --git a/quic/server/QuicServerWorker.cpp b/quic/server/QuicServerWorker.cpp index 1279e0c49..1f9711334 100644 --- a/quic/server/QuicServerWorker.cpp +++ b/quic/server/QuicServerWorker.cpp @@ -590,9 +590,8 @@ void QuicServerWorker::forwardNetworkData( "Forwarding packet with unknown connId version from client={} to another process, routingInfo={}", client.describe(), logRoutingInfo(routingData.destinationConnId)); - auto recvTime = networkData.getReceiveTimePoint(); takeoverPktHandler_.forwardPacketToAnotherServer( - client, std::move(networkData).moveAllData(), recvTime); + client, std::move(networkData)); QUIC_STATS(statsCallback_, onPacketForwarded); return; } else { @@ -795,9 +794,8 @@ void QuicServerWorker::dispatchPacketData( "Forwarding packet from client={} to another process, routingInfo={}", client.describe(), logRoutingInfo(dstConnId)); - auto recvTime = networkData.getReceiveTimePoint(); takeoverPktHandler_.forwardPacketToAnotherServer( - client, std::move(networkData).moveAllData(), recvTime); + client, std::move(networkData)); QUIC_STATS(statsCallback_, onPacketForwarded); }); diff --git a/quic/server/state/ServerStateMachine.cpp b/quic/server/state/ServerStateMachine.cpp index 36061438c..03fac4b91 100644 --- a/quic/server/state/ServerStateMachine.cpp +++ b/quic/server/state/ServerStateMachine.cpp @@ -715,7 +715,7 @@ static void handleCipherUnavailable( pendingReadData.peer = readData.peer; pendingReadData.networkData = NetworkDataSingle( ReceivedPacket(std::move(originalData->packet)), - readData.networkData.receiveTimePoint); + readData.networkData.packet.timings.receiveTimePoint); pendingData->emplace_back(std::move(pendingReadData)); VLOG(10) << "Adding pending data to " << toString(originalData->protectionType) @@ -1022,7 +1022,10 @@ void onServerReadDataFromOpen( auto& ackState = getAckState(conn, packetNumberSpace); uint64_t distanceFromExpectedPacketNum = updateLargestReceivedPacketNum( - conn, ackState, packetNum, readData.networkData.receiveTimePoint); + conn, + ackState, + packetNum, + readData.networkData.packet.timings.receiveTimePoint); if (distanceFromExpectedPacketNum > 0) { QUIC_STATS(conn.statsCallback, onOutOfOrderPacketReceived); } @@ -1110,7 +1113,7 @@ void onServerReadDataFromOpen( } }, markPacketLoss, - readData.networkData.receiveTimePoint)); + readData.networkData.packet.timings.receiveTimePoint)); break; } case QuicFrame::Type::RstStreamFrame: { @@ -1268,7 +1271,10 @@ void onServerReadDataFromOpen( // Datagram isn't retransmittable. But we would like to ack them // early. So, make Datagram frames count towards ack policy pktHasRetransmittableData = true; - handleDatagram(conn, frame, readData.networkData.receiveTimePoint); + handleDatagram( + conn, + frame, + readData.networkData.packet.timings.receiveTimePoint); break; } case QuicFrame::Type::ImmediateAckFrame: {