/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace fsp = folly::portability::sockets; namespace { constexpr socklen_t kAddrLen = sizeof(sockaddr_storage); } // namespace namespace quic { QuicClientTransportLite::QuicClientTransportLite( std::shared_ptr evb, std::unique_ptr socket, std::shared_ptr handshakeFactory, size_t connectionIdSize, PacketNum startingPacketNum, bool useConnectionEndWithErrorCallback) : QuicClientTransportLite( std::move(evb), std::move(socket), std::move(handshakeFactory), connectionIdSize, useConnectionEndWithErrorCallback) { conn_->ackStates = AckStates(startingPacketNum); } QuicClientTransportLite::QuicClientTransportLite( std::shared_ptr evb, std::unique_ptr socket, std::shared_ptr handshakeFactory, size_t connectionIdSize, bool useConnectionEndWithErrorCallback) : QuicTransportBaseLite( evb, std::move(socket), useConnectionEndWithErrorCallback), happyEyeballsConnAttemptDelayTimeout_(this) { DCHECK(handshakeFactory); auto tempConn = std::make_unique(std::move(handshakeFactory)); clientConn_ = tempConn.get(); conn_.reset(tempConn.release()); auto srcConnId = connectionIdSize > 0 ? ConnectionId::createRandom(connectionIdSize) : ConnectionId(std::vector()); conn_->clientConnectionId = srcConnId; conn_->readCodec = std::make_unique(QuicNodeType::Client); conn_->readCodec->setClientConnectionId(srcConnId); conn_->selfConnectionIds.emplace_back(srcConnId, kInitialSequenceNumber); clientConn_->initialDestinationConnectionId = ConnectionId::createRandom(kMinInitialDestinationConnIdLength); clientConn_->originalDestinationConnectionId = clientConn_->initialDestinationConnectionId; conn_->clientChosenDestConnectionId = clientConn_->initialDestinationConnectionId; VLOG(4) << "initial dcid: " << clientConn_->initialDestinationConnectionId->hex(); if (conn_->qLogger) { conn_->qLogger->setDcid(conn_->clientChosenDestConnectionId); } conn_->readCodec->setCodecParameters(CodecParameters( conn_->peerAckDelayExponent, conn_->originalVersion.value(), conn_->transportSettings.maybeAckReceiveTimestampsConfigSentToPeer, conn_->transportSettings.advertisedExtendedAckFeatures)); VLOG(10) << "client created " << *conn_; } QuicClientTransportLite::~QuicClientTransportLite() { VLOG(10) << "Destroyed connection to server=" << conn_->peerAddress; // The caller probably doesn't need the conn callback after destroying the // transport. resetConnectionCallbacks(); // Close without draining. closeImpl( QuicError( QuicErrorCode(LocalErrorCode::SHUTTING_DOWN), std::string("Closing from client destructor")), false /* drainConnection */); // closeImpl may have been called earlier with drain = true, so force close. closeUdpSocket(); if (clientConn_->happyEyeballsState.secondSocket) { auto sock = std::move(clientConn_->happyEyeballsState.secondSocket); sock->pauseRead(); sock->close(); } } folly::Expected QuicClientTransportLite::processUdpPacket( const folly::SocketAddress& peer, ReceivedUdpPacket&& udpPacket) { // Process the arriving UDP packet, which may have coalesced QUIC packets. { BufQueue& udpData = udpPacket.buf; if (!conn_->version) { // We only check for version negotiation packets before the version // is negotiated. auto versionNegotiation = conn_->readCodec->tryParsingVersionNegotiation(udpData); if (versionNegotiation) { VLOG(4) << "Got version negotiation packet from peer=" << peer << " versions=" << std::hex << versionNegotiation->versions << " " << *this; return folly::makeUnexpected(QuicError( LocalErrorCode::NEW_VERSION_NEGOTIATED, "Received version negotiation packet")); } } for (uint16_t processedPackets = 0; !udpData.empty() && processedPackets < kMaxNumCoalescedPackets; processedPackets++) { auto res = processUdpPacketData(peer, udpPacket); if (res.hasError()) { return res; } } VLOG_IF(4, !udpData.empty()) << "Leaving " << udpData.chainLength() << " bytes unprocessed after attempting to process " << kMaxNumCoalescedPackets << " packets."; } // Process any deferred pending 1RTT and handshake packets if we have keys. if (conn_->readCodec->getOneRttReadCipher() && !clientConn_->pendingOneRttData.empty()) { for (auto& pendingPacket : clientConn_->pendingOneRttData) { // The first loop should try to process any leftover data in the incoming // buffer. pendingPacket.udpPacket.buf.append(udpPacket.buf.move()); auto res = processUdpPacketData(pendingPacket.peer, pendingPacket.udpPacket); if (res.hasError()) { return res; } } clientConn_->pendingOneRttData.clear(); } if (conn_->readCodec->getHandshakeReadCipher() && !clientConn_->pendingHandshakeData.empty()) { for (auto& pendingPacket : clientConn_->pendingHandshakeData) { // The first loop should try to process any leftover data in the incoming // buffer. pendingPacket.udpPacket.buf.append(udpPacket.buf.move()); auto res = processUdpPacketData(pendingPacket.peer, pendingPacket.udpPacket); if (res.hasError()) { return res; } } clientConn_->pendingHandshakeData.clear(); } return folly::unit; } folly::Expected QuicClientTransportLite::processUdpPacketData( const folly::SocketAddress& peer, ReceivedUdpPacket& udpPacket) { auto packetSize = udpPacket.buf.chainLength(); if (packetSize == 0) { return folly::unit; } auto parsedPacket = conn_->readCodec->parsePacket( udpPacket.buf, conn_->ackStates, conn_->clientConnectionId->size()); StatelessReset* statelessReset = parsedPacket.statelessReset(); if (statelessReset) { const auto& token = clientConn_->statelessResetToken; if (statelessReset->token == token) { VLOG(4) << "Received Stateless Reset " << *this; conn_->peerConnectionError = QuicError( QuicErrorCode(LocalErrorCode::CONNECTION_RESET), toString(LocalErrorCode::CONNECTION_RESET).str()); return folly::makeUnexpected( QuicError(LocalErrorCode::NO_ERROR, "Stateless Reset Received")); } VLOG(4) << "Drop StatelessReset for bad connId or token " << *this; // Don't treat this as a fatal error, just ignore the packet. return folly::unit; } RetryPacket* retryPacket = parsedPacket.retryPacket(); if (retryPacket) { if (conn_->qLogger) { conn_->qLogger->addPacket(*retryPacket, packetSize, true); } // we reject retry packet if our initial has been processed or we've rx'd a // prior retry packet; note that initialAckState is reset to nullptr only // after we've confirmed handshake. bool shouldRejectRetryPacket = !conn_->ackStates.initialAckState || conn_->ackStates.initialAckState->largestRecvdPacketNum.has_value() || !clientConn_->retryToken.empty(); if (shouldRejectRetryPacket) { VLOG(4) << "Server incorrectly issued a retry packet; dropping retry " << *this; // Not a fatal error, just ignore the packet. return folly::unit; } const ConnectionId* originalDstConnId = &(*clientConn_->originalDestinationConnectionId); if (!clientConn_->clientHandshakeLayer->verifyRetryIntegrityTag( *originalDstConnId, *retryPacket)) { VLOG(4) << "The integrity tag in the retry packet was invalid. " << "Dropping bad retry packet. " << *this; // Not a fatal error, just ignore the packet. return folly::unit; } if (happyEyeballsEnabled_) { happyEyeballsOnDataReceived( *clientConn_, happyEyeballsConnAttemptDelayTimeout_, socket_, peer); } // Set the destination connection ID to be the value from the source // connection id of the retry packet clientConn_->initialDestinationConnectionId = retryPacket->header.getSourceConnId(); auto released = static_cast(conn_.release()); std::unique_ptr uniqueClient(released); auto tempConn = undoAllClientStateForRetry(std::move(uniqueClient)); clientConn_ = tempConn.get(); conn_.reset(tempConn.release()); clientConn_->retryToken = retryPacket->header.getToken(); // TODO (amsharma): add a "RetryPacket" QLog event, and log it here. // TODO (amsharma): verify the "original_connection_id" parameter // upon receiving a subsequent initial from the server. auto handshakeResult = startCryptoHandshake(); if (handshakeResult.hasError()) { return folly::makeUnexpected(handshakeResult.error()); } return folly::unit; // Retry processed successfully } auto cipherUnavailable = parsedPacket.cipherUnavailable(); if (cipherUnavailable && cipherUnavailable->packet && !cipherUnavailable->packet->empty() && (cipherUnavailable->protectionType == ProtectionType::KeyPhaseZero || cipherUnavailable->protectionType == ProtectionType::Handshake) && clientConn_->pendingOneRttData.size() + clientConn_->pendingHandshakeData.size() < clientConn_->transportSettings.maxPacketsToBuffer) { auto& pendingData = cipherUnavailable->protectionType == ProtectionType::KeyPhaseZero ? clientConn_->pendingOneRttData : clientConn_->pendingHandshakeData; pendingData.emplace_back( ReceivedUdpPacket( std::move(cipherUnavailable->packet), udpPacket.timings, udpPacket.tosValue), peer); if (conn_->qLogger) { conn_->qLogger->addPacketBuffered( cipherUnavailable->protectionType, packetSize); } // Packet buffered, not an error return folly::unit; } auto codecError = parsedPacket.codecError(); if (codecError) { return folly::makeUnexpected(QuicError( *codecError->error.code.asTransportErrorCode(), std::move(codecError->error.message))); } RegularQuicPacket* regularOptional = parsedPacket.regularPacket(); if (!regularOptional) { VLOG(4) << "Packet parse error for " << *this; QUIC_STATS( statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR_CLIENT); if (conn_->qLogger) { conn_->qLogger->addPacketDrop(packetSize, kParse); } // If this was a protocol violation, we would return a codec error instead. // Ignore this case as something that caused a non-codec parse error. return folly::unit; } if (regularOptional->frames.empty()) { // This is either a packet that has no data (long-header parsed but no data // found) or a regular packet with a short header and no frames. Both are // protocol violations. LOG(ERROR) << "Packet has no frames " << *this; QUIC_STATS( conn_->statsCallback, onPacketDropped, PacketDropReason::PROTOCOL_VIOLATION); if (conn_->qLogger) { conn_->qLogger->addPacketDrop( packetSize, PacketDropReason(PacketDropReason::PROTOCOL_VIOLATION)._to_string()); } return folly::makeUnexpected(QuicError( TransportErrorCode::PROTOCOL_VIOLATION, "Packet has no frames")); } if (happyEyeballsEnabled_) { CHECK(socket_); happyEyeballsOnDataReceived( *clientConn_, happyEyeballsConnAttemptDelayTimeout_, socket_, peer); } LongHeader* longHeader = regularOptional->header.asLong(); ShortHeader* shortHeader = regularOptional->header.asShort(); auto protectionLevel = regularOptional->header.getProtectionType(); auto encryptionLevel = protectionTypeToEncryptionLevel(protectionLevel); auto packetNum = regularOptional->header.getPacketSequenceNum(); auto pnSpace = regularOptional->header.getPacketNumberSpace(); bool isProtectedPacket = protectionLevel == ProtectionType::KeyPhaseZero || protectionLevel == ProtectionType::KeyPhaseOne; auto& regularPacket = *regularOptional; if (conn_->qLogger) { conn_->qLogger->addPacket(regularPacket, packetSize); } if (!isProtectedPacket) { for (auto& quicFrame : regularPacket.frames) { auto isPadding = quicFrame.asPaddingFrame(); auto isAck = quicFrame.asReadAckFrame(); auto isClose = quicFrame.asConnectionCloseFrame(); auto isCrypto = quicFrame.asReadCryptoFrame(); auto isPing = quicFrame.asPingFrame(); // TODO: add path challenge and response if (!isPadding && !isAck && !isClose && !isCrypto && !isPing) { return folly::makeUnexpected( QuicError(TransportErrorCode::PROTOCOL_VIOLATION, "Invalid frame")); } } } // We got a packet that was not the version negotiation packet, that means // that the version is now bound to the new packet. if (!conn_->version) { conn_->version = conn_->originalVersion; } if (!conn_->serverConnectionId && longHeader) { conn_->serverConnectionId = longHeader->getSourceConnId(); conn_->peerConnectionIds.emplace_back( longHeader->getSourceConnId(), kInitialSequenceNumber); conn_->readCodec->setServerConnectionId(*conn_->serverConnectionId); } // Error out if the connection id on the packet is not the one that is // expected. bool connidMatched = true; if ((longHeader && longHeader->getDestinationConnId() != *conn_->clientConnectionId) || (shortHeader && shortHeader->getConnectionId() != *conn_->clientConnectionId)) { connidMatched = false; } if (!connidMatched) { return folly::makeUnexpected(QuicError( TransportErrorCode::PROTOCOL_VIOLATION, "Invalid connection id")); } // Add the packet to the AckState associated with the packet number space. auto& ackState = getAckState(*conn_, pnSpace); uint64_t distanceFromExpectedPacketNum = addPacketToAckState(*conn_, ackState, packetNum, udpPacket); if (distanceFromExpectedPacketNum > 0) { QUIC_STATS(conn_->statsCallback, onOutOfOrderPacketReceived); } bool pktHasRetransmittableData = false; bool pktHasCryptoData = false; AckedPacketVisitor ackedPacketVisitor = [&](const OutstandingPacketWrapper& outstandingPacket) { auto outstandingProtectionType = outstandingPacket.packet.header.getProtectionType(); if (outstandingProtectionType == ProtectionType::KeyPhaseZero) { // If we received an ack for data that we sent in 1-rtt from // the server, we can assume that the server had successfully // derived the 1-rtt keys and hence received the client // finished message. We can mark the handshake as confirmed and // drop the handshake cipher and outstanding packets after the // processing loop. conn_->handshakeLayer->handshakeConfirmed(); } return maybeVerifyPendingKeyUpdate( *conn_, outstandingPacket, regularPacket); }; AckedFrameVisitor ackedFrameVisitor = [&](const OutstandingPacketWrapper& outstandingPacket, const QuicWriteFrame& packetFrame) -> folly::Expected { auto outstandingProtectionType = outstandingPacket.packet.header.getProtectionType(); switch (packetFrame.type()) { case QuicWriteFrame::Type::WriteAckFrame: { const WriteAckFrame& frame = *packetFrame.asWriteAckFrame(); DCHECK(!frame.ackBlocks.empty()); VLOG(4) << "Client received ack for largestAcked=" << frame.ackBlocks.front().end << " " << *this; commonAckVisitorForAckFrame(ackState, frame); break; } case QuicWriteFrame::Type::RstStreamFrame: { const RstStreamFrame& frame = *packetFrame.asRstStreamFrame(); VLOG(4) << "Client received ack for reset frame stream=" << frame.streamId << " " << *this; auto stream = conn_->streamManager->getStream(frame.streamId).value_or(nullptr); if (stream) { return sendRstAckSMHandler(*stream, frame.reliableSize); } break; } case QuicWriteFrame::Type::WriteStreamFrame: { const WriteStreamFrame& frame = *packetFrame.asWriteStreamFrame(); auto ackedStreamResult = conn_->streamManager->getStream(frame.streamId); if (ackedStreamResult.hasError()) { return folly::makeUnexpected(ackedStreamResult.error()); } auto& ackedStream = ackedStreamResult.value(); VLOG(4) << "Client got ack for stream=" << frame.streamId << " offset=" << frame.offset << " fin=" << frame.fin << " data=" << frame.len << " closed=" << (ackedStream == nullptr) << " " << *this; if (ackedStream) { return sendAckSMHandler(*ackedStream, frame); } break; } case QuicWriteFrame::Type::WriteCryptoFrame: { const WriteCryptoFrame& frame = *packetFrame.asWriteCryptoFrame(); auto cryptoStream = getCryptoStream( *conn_->cryptoState, protectionTypeToEncryptionLevel(outstandingProtectionType)); processCryptoStreamAck(*cryptoStream, frame.offset, frame.len); break; } case QuicWriteFrame::Type::PingFrame: conn_->pendingEvents.cancelPingTimeout = true; break; case QuicWriteFrame::Type::QuicSimpleFrame: default: // ignore other frames. break; } return folly::unit; }; for (auto& quicFrame : regularPacket.frames) { switch (quicFrame.type()) { case QuicFrame::Type::ReadAckFrame: { VLOG(10) << "Client received ack frame in packet=" << packetNum << " " << *this; ReadAckFrame& ackFrame = *quicFrame.asReadAckFrame(); if (ackFrame.frameType == FrameType::ACK_EXTENDED && !conn_->transportSettings.advertisedExtendedAckFeatures) { return folly::makeUnexpected(QuicError( TransportErrorCode::PROTOCOL_VIOLATION, "Received unexpected ACK_EXTENDED frame")); } else if ( ackFrame.frameType == FrameType::ACK_RECEIVE_TIMESTAMPS && !conn_->transportSettings .maybeAckReceiveTimestampsConfigSentToPeer) { return folly::makeUnexpected(QuicError( TransportErrorCode::PROTOCOL_VIOLATION, "Received unexpected ACK_RECEIVE_TIMESTAMPS frame")); } auto result = processAckFrame( *conn_, pnSpace, ackFrame, ackedPacketVisitor, ackedFrameVisitor, markPacketLoss, udpPacket.timings.receiveTimePoint); if (result.hasError()) { return folly::makeUnexpected(result.error()); } conn_->lastProcessedAckEvents.emplace_back(std::move(result.value())); break; } case QuicFrame::Type::RstStreamFrame: { RstStreamFrame& frame = *quicFrame.asRstStreamFrame(); VLOG(10) << "Client received reset stream=" << frame.streamId << " " << *this; if (frame.reliableSize.hasValue()) { // We're not yet supporting the handling of RESET_STREAM_AT frames return folly::makeUnexpected(QuicError( TransportErrorCode::PROTOCOL_VIOLATION, "Reliable resets not supported")); } pktHasRetransmittableData = true; auto streamResult = conn_->streamManager->getStream(frame.streamId); if (streamResult.hasError()) { return folly::makeUnexpected(streamResult.error()); } auto& stream = streamResult.value(); if (!stream) { break; } auto rstResult = receiveRstStreamSMHandler(*stream, frame); if (rstResult.hasError()) { return folly::makeUnexpected(rstResult.error()); } break; } case QuicFrame::Type::ReadCryptoFrame: { pktHasRetransmittableData = true; pktHasCryptoData = true; ReadCryptoFrame& cryptoFrame = *quicFrame.asReadCryptoFrame(); VLOG(10) << "Client received crypto data offset=" << cryptoFrame.offset << " len=" << cryptoFrame.data->computeChainDataLength() << " packetNum=" << packetNum << " " << *this; auto appendResult = appendDataToReadBuffer( *getCryptoStream(*conn_->cryptoState, encryptionLevel), StreamBuffer( std::move(cryptoFrame.data), cryptoFrame.offset, false)); if (appendResult.hasError()) { return folly::makeUnexpected(appendResult.error()); } break; } case QuicFrame::Type::ReadStreamFrame: { ReadStreamFrame& frame = *quicFrame.asReadStreamFrame(); VLOG(10) << "Client received stream data for stream=" << frame.streamId << " offset=" << frame.offset << " len=" << frame.data->computeChainDataLength() << " fin=" << frame.fin << " packetNum=" << packetNum << " " << *this; auto streamResult = conn_->streamManager->getStream(frame.streamId); if (streamResult.hasError()) { return folly::makeUnexpected(streamResult.error()); } auto& stream = streamResult.value(); pktHasRetransmittableData = true; if (!stream) { VLOG(10) << "Could not find stream=" << frame.streamId << " " << *conn_; break; } auto readResult = receiveReadStreamFrameSMHandler(*stream, std::move(frame)); if (readResult.hasError()) { return folly::makeUnexpected(readResult.error()); } break; } case QuicFrame::Type::ReadNewTokenFrame: { ReadNewTokenFrame& newTokenFrame = *quicFrame.asReadNewTokenFrame(); std::string tokenStr = newTokenFrame.token->to(); VLOG(10) << "client received new token token=" << folly::hexlify(tokenStr); if (newTokenCallback_) { newTokenCallback_(std::move(tokenStr)); } break; } case QuicFrame::Type::MaxDataFrame: { MaxDataFrame& connWindowUpdate = *quicFrame.asMaxDataFrame(); VLOG(10) << "Client received max data offset=" << connWindowUpdate.maximumData << " " << *this; pktHasRetransmittableData = true; handleConnWindowUpdate(*conn_, connWindowUpdate, packetNum); break; } case QuicFrame::Type::MaxStreamDataFrame: { MaxStreamDataFrame& streamWindowUpdate = *quicFrame.asMaxStreamDataFrame(); VLOG(10) << "Client received max stream data stream=" << streamWindowUpdate.streamId << " offset=" << streamWindowUpdate.maximumData << " " << *this; if (isReceivingStream(conn_->nodeType, streamWindowUpdate.streamId)) { return folly::makeUnexpected(QuicError( TransportErrorCode::STREAM_STATE_ERROR, "Received MaxStreamDataFrame for receiving stream.")); } pktHasRetransmittableData = true; auto streamResult = conn_->streamManager->getStream(streamWindowUpdate.streamId); if (streamResult.hasError()) { return folly::makeUnexpected(streamResult.error()); } auto& stream = streamResult.value(); if (stream) { handleStreamWindowUpdate( *stream, streamWindowUpdate.maximumData, packetNum); } break; } case QuicFrame::Type::DataBlockedFrame: { VLOG(10) << "Client received blocked " << *this; pktHasRetransmittableData = true; handleConnBlocked(*conn_); break; } case QuicFrame::Type::StreamDataBlockedFrame: { // peer wishes to send data, but is unable to due to stream-level flow // control StreamDataBlockedFrame& blocked = *quicFrame.asStreamDataBlockedFrame(); VLOG(10) << "Client received blocked stream=" << blocked.streamId << " " << *this; pktHasRetransmittableData = true; auto streamResult = conn_->streamManager->getStream(blocked.streamId); if (streamResult.hasError()) { return folly::makeUnexpected(streamResult.error()); } auto& stream = streamResult.value(); if (stream) { handleStreamBlocked(*stream); } break; } case QuicFrame::Type::StreamsBlockedFrame: { // peer wishes to open a stream, but is unable to due to the maximum // stream limit set by us StreamsBlockedFrame& blocked = *quicFrame.asStreamsBlockedFrame(); VLOG(10) << "Client received stream blocked limit=" << blocked.streamLimit << " " << *this; // TODO implement handler for it break; } case QuicFrame::Type::ConnectionCloseFrame: { ConnectionCloseFrame& connFrame = *quicFrame.asConnectionCloseFrame(); auto errMsg = folly::to( "Client closed by peer reason=", connFrame.reasonPhrase); VLOG(4) << errMsg << " " << *this; // we want to deliver app callbacks with the peer supplied error, // but send a NO_ERROR to the peer. if (conn_->qLogger) { conn_->qLogger->addTransportStateUpdate(getPeerClose(errMsg)); } conn_->peerConnectionError = QuicError(QuicErrorCode(connFrame.errorCode), std::move(errMsg)); // We don't return an error here, as receiving a close triggers the // peer connection error path instead of the local error path. return folly::unit; } case QuicFrame::Type::PingFrame: // Ping isn't retransmittable. But we would like to ack them early. // So, make Ping frames count towards ack policy pktHasRetransmittableData = true; conn_->pendingEvents.notifyPingReceived = true; break; case QuicFrame::Type::PaddingFrame: break; case QuicFrame::Type::QuicSimpleFrame: { QuicSimpleFrame& simpleFrame = *quicFrame.asQuicSimpleFrame(); pktHasRetransmittableData = true; auto updateResult = updateSimpleFrameOnPacketReceived( *conn_, simpleFrame, longHeader ? longHeader->getDestinationConnId() : shortHeader->getConnectionId(), false); if (updateResult.hasError()) { return folly::makeUnexpected(updateResult.error()); } break; } case QuicFrame::Type::DatagramFrame: { DatagramFrame& frame = *quicFrame.asDatagramFrame(); VLOG(10) << "Client received datagram data: " << "len=" << frame.length << " " << *this; // Datagram isn't retransmittable. But we would like to ack them early. // So, make Datagram frames count towards ack policy pktHasRetransmittableData = true; handleDatagram(*conn_, frame, udpPacket.timings.receiveTimePoint); break; } case QuicFrame::Type::ImmediateAckFrame: { if (!conn_->transportSettings.minAckDelay.has_value()) { // We do not accept IMMEDIATE_ACK frames. This is a protocol // violation. return folly::makeUnexpected(QuicError( TransportErrorCode::PROTOCOL_VIOLATION, "Received IMMEDIATE_ACK frame without announcing min_ack_delay")); } // Send an ACK from any packet number space. if (conn_->ackStates.initialAckState) { conn_->ackStates.initialAckState->needsToSendAckImmediately = true; } if (conn_->ackStates.handshakeAckState) { conn_->ackStates.handshakeAckState->needsToSendAckImmediately = true; } conn_->ackStates.appDataAckState.needsToSendAckImmediately = true; break; } default: break; } } auto handshakeLayer = clientConn_->clientHandshakeLayer; if (handshakeLayer->getPhase() == ClientHandshake::Phase::Established && hasInitialOrHandshakeCiphers(*conn_)) { handshakeConfirmed(*conn_); } maybeScheduleAckForCongestionFeedback(udpPacket, ackState); maybeHandleIncomingKeyUpdate(*conn_); // Try reading bytes off of crypto, and performing a handshake. auto cryptoData = readDataFromCryptoStream( *getCryptoStream(*conn_->cryptoState, encryptionLevel)); if (cryptoData) { bool hadOneRttKey = conn_->oneRttWriteCipher != nullptr; handshakeLayer->doHandshake(std::move(cryptoData), encryptionLevel); bool oneRttKeyDerivationTriggered = false; if (!hadOneRttKey && conn_->oneRttWriteCipher) { oneRttKeyDerivationTriggered = true; updatePacingOnKeyEstablished(*conn_); } if (conn_->oneRttWriteCipher && conn_->readCodec->getOneRttReadCipher()) { clientConn_->zeroRttWriteCipher.reset(); clientConn_->zeroRttWriteHeaderCipher.reset(); } if (!clientConn_->zeroRttRejected.has_value()) { clientConn_->zeroRttRejected = handshakeLayer->getZeroRttRejected(); if (clientConn_->zeroRttRejected.has_value() && *clientConn_->zeroRttRejected) { if (conn_->qLogger) { conn_->qLogger->addTransportStateUpdate(kZeroRttRejected); } QUIC_STATS(conn_->statsCallback, onZeroRttRejected); handshakeLayer->removePsk(hostname_); if (!handshakeLayer->getCanResendZeroRtt().value_or(false)) { return folly::makeUnexpected(QuicError( TransportErrorCode::TRANSPORT_PARAMETER_ERROR, "Zero-rtt attempted but the early parameters do not match the handshake parameters")); } } else if (clientConn_->zeroRttRejected.has_value()) { if (conn_->qLogger) { conn_->qLogger->addTransportStateUpdate(kZeroRttAccepted); } QUIC_STATS(conn_->statsCallback, onZeroRttAccepted); conn_->usedZeroRtt = true; } } // We should get transport parameters if we've derived 1-rtt keys and 0-rtt // was rejected, or we have derived 1-rtt keys and 0-rtt was never // attempted. if (oneRttKeyDerivationTriggered) { const auto& serverParams = handshakeLayer->getServerTransportParams(); if (!serverParams) { return folly::makeUnexpected(QuicError( TransportErrorCode::TRANSPORT_PARAMETER_ERROR, "No server transport params")); } if ((clientConn_->zeroRttRejected.has_value() && *clientConn_->zeroRttRejected) || !clientConn_->zeroRttRejected.has_value()) { auto originalPeerMaxOffset = conn_->flowControlState.peerAdvertisedMaxOffset; auto originalPeerInitialStreamOffsetBidiLocal = conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetBidiLocal; auto originalPeerInitialStreamOffsetBidiRemote = conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetBidiRemote; auto originalPeerInitialStreamOffsetUni = conn_->flowControlState.peerAdvertisedInitialMaxStreamOffsetUni; VLOG(10) << "Client negotiated transport params " << *this; auto maxStreamsBidi = getIntegerParameter( TransportParameterId::initial_max_streams_bidi, serverParams->parameters); auto maxStreamsUni = getIntegerParameter( TransportParameterId::initial_max_streams_uni, serverParams->parameters); auto processResult = processServerInitialParams( *clientConn_, serverParams.value(), packetNum); if (processResult.hasError()) { return folly::makeUnexpected(processResult.error()); } cacheServerInitialParams( *clientConn_, conn_->flowControlState.peerAdvertisedMaxOffset, conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetBidiLocal, conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetBidiRemote, conn_->flowControlState.peerAdvertisedInitialMaxStreamOffsetUni, maxStreamsBidi.value_or(0), maxStreamsUni.value_or(0), conn_->peerAdvertisedKnobFrameSupport, conn_->maybePeerAckReceiveTimestampsConfig.has_value(), conn_->maybePeerAckReceiveTimestampsConfig ? conn_->maybePeerAckReceiveTimestampsConfig ->maxReceiveTimestampsPerAck : 0, conn_->maybePeerAckReceiveTimestampsConfig ? conn_->maybePeerAckReceiveTimestampsConfig ->receiveTimestampsExponent : 3, conn_->peerAdvertisedReliableStreamResetSupport, conn_->peerAdvertisedExtendedAckFeatures); if (clientConn_->zeroRttRejected.has_value() && *clientConn_->zeroRttRejected) { // verify that the new flow control parameters are >= the original // transport parameters that were use. This is the easy case. If the // flow control decreases then we are just screwed and we need to have // the app retry the connection. The other parameters can be updated. // TODO: implement undo transport state on retry. if (originalPeerMaxOffset > conn_->flowControlState.peerAdvertisedMaxOffset || originalPeerInitialStreamOffsetBidiLocal > conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetBidiLocal || originalPeerInitialStreamOffsetBidiRemote > conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetBidiRemote || originalPeerInitialStreamOffsetUni > conn_->flowControlState .peerAdvertisedInitialMaxStreamOffsetUni) { return folly::makeUnexpected(QuicError( TransportErrorCode::TRANSPORT_PARAMETER_ERROR, "Rejection of zero rtt parameters unsupported")); } } } updateNegotiatedAckFeatures(*conn_); // TODO This sucks, but manually update the max packet size until we fix // 0-rtt transport parameters. if (conn_->transportSettings.canIgnorePathMTU && clientConn_->zeroRttRejected.has_value() && !*clientConn_->zeroRttRejected) { auto updatedPacketSize = getIntegerParameter( TransportParameterId::max_packet_size, serverParams->parameters); updatedPacketSize = std::max( updatedPacketSize.value_or(kDefaultUDPSendPacketLen), kDefaultUDPSendPacketLen); updatedPacketSize = std::min(*updatedPacketSize, kDefaultMaxUDPPayload); conn_->udpSendPacketLen = *updatedPacketSize; } // TODO this is another bandaid. Explicitly set the stateless reset token // or else conns that use 0-RTT won't be able to parse stateless resets. if (!clientConn_->statelessResetToken) { clientConn_->statelessResetToken = getStatelessResetTokenParameter(serverParams->parameters); } if (clientConn_->statelessResetToken) { conn_->readCodec->setStatelessResetToken( clientConn_->statelessResetToken.value()); auto& cryptoFactory = handshakeLayer->getCryptoFactory(); conn_->readCodec->setCryptoEqual( cryptoFactory.getCryptoEqualFunction()); } } if (clientConn_->zeroRttRejected.has_value() && *clientConn_->zeroRttRejected) { // TODO: Make sure the alpn is the same, if not then do a full undo of the // state. clientConn_->zeroRttWriteCipher.reset(); clientConn_->zeroRttWriteHeaderCipher.reset(); auto result = markZeroRttPacketsLost(*conn_, markPacketLoss); if (result.hasError()) { return result; } } } updateAckSendStateOnRecvPacket( *conn_, ackState, distanceFromExpectedPacketNum, pktHasRetransmittableData, pktHasCryptoData); if (encryptionLevel == EncryptionLevel::Handshake && conn_->initialWriteCipher) { conn_->initialWriteCipher.reset(); conn_->initialHeaderCipher.reset(); conn_->readCodec->setInitialReadCipher(nullptr); conn_->readCodec->setInitialHeaderCipher(nullptr); implicitAckCryptoStream(*conn_, EncryptionLevel::Initial); } return folly::unit; } folly::Expected QuicClientTransportLite::onReadData( const folly::SocketAddress& peer, ReceivedUdpPacket&& udpPacket) { if (closeState_ == CloseState::CLOSED) { // If we are closed, then we shouldn't process new network data. QUIC_STATS( statsCallback_, onPacketDropped, PacketDropReason::CLIENT_STATE_CLOSED); if (conn_->qLogger) { conn_->qLogger->addPacketDrop(0, kAlreadyClosed); } return folly::unit; } bool waitingForFirstPacket = !hasReceivedUdpPackets(*conn_); auto res = processUdpPacket(peer, std::move(udpPacket)); if (res.hasError()) { return res; } if (connSetupCallback_ && waitingForFirstPacket && hasReceivedUdpPackets(*conn_)) { connSetupCallback_->onFirstPeerPacketProcessed(); } if (!transportReadyNotified_ && hasWriteCipher()) { transportReadyNotified_ = true; connSetupCallback_->onTransportReady(); // This is a new connection. Update QUIC Stats QUIC_STATS(statsCallback_, onNewConnection); } // Checking connSetupCallback_ because application will start to write data // in onTransportReady, if the write fails, QuicSocket can be closed // and connSetupCallback_ is set nullptr. if (connSetupCallback_ && !replaySafeNotified_ && conn_->oneRttWriteCipher) { replaySafeNotified_ = true; // We don't need this any more. Also unset it so that we don't allow random // middleboxes to shutdown our connection once we have crypto keys. socket_->setErrMessageCallback(nullptr); connSetupCallback_->onReplaySafe(); } maybeSendTransportKnobs(); return folly::unit; } QuicSocketLite::WriteResult QuicClientTransportLite::writeBufMeta( StreamId /* id */, const BufferMeta& /* data */, bool /* eof */, ByteEventCallback* /* cb */) { return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); } QuicSocketLite::WriteResult QuicClientTransportLite::setDSRPacketizationRequestSender( StreamId /* id */, std::unique_ptr /* sender */) { return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); } folly::Expected QuicClientTransportLite::writeData() { QuicVersion version = conn_->version.value_or(*conn_->originalVersion); const ConnectionId& srcConnId = *conn_->clientConnectionId; const ConnectionId& destConnId = conn_->serverConnectionId.value_or( *clientConn_->initialDestinationConnectionId); if (closeState_ == CloseState::CLOSED) { auto rtt = clientConn_->lossState.srtt == 0us ? clientConn_->transportSettings.initialRtt : clientConn_->lossState.srtt; if (clientConn_->lastCloseSentTime && Clock::now() - *clientConn_->lastCloseSentTime < rtt) { return folly::unit; } clientConn_->lastCloseSentTime = Clock::now(); if (clientConn_->clientHandshakeLayer->getPhase() == ClientHandshake::Phase::Established && conn_->oneRttWriteCipher) { CHECK(conn_->oneRttWriteHeaderCipher); writeShortClose( *socket_, *conn_, destConnId, conn_->localConnectionError, *conn_->oneRttWriteCipher, *conn_->oneRttWriteHeaderCipher); } if (conn_->handshakeWriteCipher) { CHECK(conn_->handshakeWriteHeaderCipher); writeLongClose( *socket_, *conn_, srcConnId, destConnId, LongHeader::Types::Handshake, conn_->localConnectionError, *conn_->handshakeWriteCipher, *conn_->handshakeWriteHeaderCipher, version); } if (conn_->initialWriteCipher) { CHECK(conn_->initialHeaderCipher); writeLongClose( *socket_, *conn_, srcConnId, destConnId, LongHeader::Types::Initial, conn_->localConnectionError, *conn_->initialWriteCipher, *conn_->initialHeaderCipher, version); } return folly::unit; } uint64_t packetLimit = (isConnectionPaced(*conn_) ? conn_->pacer->updateAndGetWriteBatchSize(Clock::now()) : conn_->transportSettings.writeConnectionDataPacketsLimit); // At the end of this function, clear out any probe packets credit we didn't // use. SCOPE_EXIT { conn_->pendingEvents.numProbePackets = {}; maybeInitiateKeyUpdate(*conn_); }; if (conn_->initialWriteCipher) { const std::string& token = clientConn_->retryToken.empty() ? clientConn_->newToken : clientConn_->retryToken; auto result = handleInitialWriteDataCommon(srcConnId, destConnId, packetLimit, token); if (result.hasError()) { return folly::makeUnexpected(result.error()); } packetLimit -= result->packetsWritten; if (!packetLimit && !conn_->pendingEvents.anyProbePackets()) { return folly::unit; } } if (conn_->handshakeWriteCipher) { auto result = handleHandshakeWriteDataCommon(srcConnId, destConnId, packetLimit); if (result.hasError()) { return folly::makeUnexpected(result.error()); } packetLimit -= result->packetsWritten; if (!packetLimit && !conn_->pendingEvents.anyProbePackets()) { return folly::unit; } } if (clientConn_->zeroRttWriteCipher && !conn_->oneRttWriteCipher) { CHECK(clientConn_->zeroRttWriteHeaderCipher); auto result = writeZeroRttDataToSocket( *socket_, *conn_, srcConnId /* src */, destConnId /* dst */, *clientConn_->zeroRttWriteCipher, *clientConn_->zeroRttWriteHeaderCipher, version, packetLimit); if (result.hasError()) { return folly::makeUnexpected(result.error()); } packetLimit -= *result; } if (!packetLimit && !conn_->pendingEvents.anyProbePackets()) { return folly::unit; } if (conn_->oneRttWriteCipher) { CHECK(clientConn_->oneRttWriteHeaderCipher); auto result = writeQuicDataExceptCryptoStreamToSocket( *socket_, *conn_, srcConnId, destConnId, *conn_->oneRttWriteCipher, *conn_->oneRttWriteHeaderCipher, version, packetLimit); if (result.hasError()) { return folly::makeUnexpected(result.error()); } } return folly::unit; } folly::Expected QuicClientTransportLite::startCryptoHandshake() { auto self = this->shared_from_this(); setIdleTimer(); // We need to update the flow control settings every time we start a crypto // handshake. This is so that we can reset the flow control settings when // we go through version negotiation as well. updateFlowControlStateWithSettings( conn_->flowControlState, conn_->transportSettings); auto handshakeLayer = clientConn_->clientHandshakeLayer; auto& cryptoFactory = handshakeLayer->getCryptoFactory(); auto version = conn_->originalVersion.value(); conn_->initialWriteCipher = cryptoFactory.getClientInitialCipher( *clientConn_->initialDestinationConnectionId, version); conn_->readCodec->setInitialReadCipher(cryptoFactory.getServerInitialCipher( *clientConn_->initialDestinationConnectionId, version)); conn_->readCodec->setInitialHeaderCipher( cryptoFactory.makeServerInitialHeaderCipher( *clientConn_->initialDestinationConnectionId, version)); conn_->initialHeaderCipher = cryptoFactory.makeClientInitialHeaderCipher( *clientConn_->initialDestinationConnectionId, version); customTransportParameters_ = getSupportedExtTransportParams(*conn_); auto paramsExtension = std::make_shared( conn_->originalVersion.value(), conn_->transportSettings.advertisedInitialConnectionFlowControlWindow, conn_->transportSettings .advertisedInitialBidiLocalStreamFlowControlWindow, conn_->transportSettings .advertisedInitialBidiRemoteStreamFlowControlWindow, conn_->transportSettings.advertisedInitialUniStreamFlowControlWindow, conn_->transportSettings.advertisedInitialMaxStreamsBidi, conn_->transportSettings.advertisedInitialMaxStreamsUni, conn_->transportSettings.idleTimeout, conn_->transportSettings.ackDelayExponent, conn_->transportSettings.maxRecvPacketSize, conn_->transportSettings.selfActiveConnectionIdLimit, conn_->clientConnectionId.value(), customTransportParameters_); conn_->transportParametersEncoded = true; if (!conn_->transportSettings.flowPriming.empty() && conn_->peerAddress.isInitialized()) { auto flowPrimingBuf = BufHelpers::copyBuffer(conn_->transportSettings.flowPriming); iovec vec[kNumIovecBufferChains]; size_t iovec_len = fillIovec(flowPrimingBuf, vec); socket_->write(conn_->peerAddress, vec, iovec_len); } handshakeLayer->connect(hostname_, std::move(paramsExtension)); auto writeResult = writeSocketData(); if (writeResult.hasError()) { return folly::makeUnexpected(writeResult.error()); } if (!transportReadyNotified_ && clientConn_->zeroRttWriteCipher) { transportReadyNotified_ = true; runOnEvbAsync([](auto self) { auto clientPtr = dynamic_cast(self.get()); if (clientPtr->connSetupCallback_) { clientPtr->connSetupCallback_->onTransportReady(); } }); } return folly::unit; } bool QuicClientTransportLite::hasWriteCipher() const { return clientConn_->oneRttWriteCipher || clientConn_->zeroRttWriteCipher; } bool QuicClientTransportLite::hasZeroRttWriteCipher() const { return clientConn_->zeroRttWriteCipher != nullptr; } std::shared_ptr QuicClientTransportLite::sharedGuard() { return shared_from_this(); } bool QuicClientTransportLite::isTLSResumed() const { return clientConn_->clientHandshakeLayer->isTLSResumed(); } void QuicClientTransportLite::errMessage( [[maybe_unused]] const cmsghdr& cmsg) noexcept { #ifdef FOLLY_HAVE_MSG_ERRQUEUE if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) || (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) { // Time to make some assumptions. We assume the first socket == IPv6, if it // exists, and the second socket is IPv4. Then we basically do the same // thing we would have done if we'd gotten a write error on that socket. // If both sockets are not functional we close the connection. auto& happyEyeballsState = clientConn_->happyEyeballsState; if (!happyEyeballsState.finished) { if (cmsg.cmsg_level == SOL_IPV6 && happyEyeballsState.shouldWriteToFirstSocket) { happyEyeballsState.shouldWriteToFirstSocket = false; socket_->pauseRead(); if (happyEyeballsState.connAttemptDelayTimeout && isTimeoutScheduled(happyEyeballsState.connAttemptDelayTimeout)) { happyEyeballsState.connAttemptDelayTimeout->timeoutExpired(); cancelTimeout(happyEyeballsState.connAttemptDelayTimeout); } } else if ( cmsg.cmsg_level == SOL_IP && happyEyeballsState.shouldWriteToSecondSocket) { happyEyeballsState.shouldWriteToSecondSocket = false; happyEyeballsState.secondSocket->pauseRead(); } } const struct sock_extended_err* serr = reinterpret_cast(CMSG_DATA(&cmsg)); auto errStr = folly::errnoStr(serr->ee_errno); if (!happyEyeballsState.shouldWriteToFirstSocket && !happyEyeballsState.shouldWriteToSecondSocket) { runOnEvbAsync([errString = std::move(errStr)](auto self) mutable { auto quicError = QuicError( QuicErrorCode(LocalErrorCode::CONNECT_FAILED), std::move(errString)); auto clientPtr = dynamic_cast(self.get()); clientPtr->closeImpl(std::move(quicError), false, false); }); } } #endif } void QuicClientTransportLite::onReadError( const folly::AsyncSocketException& ex) noexcept { if (closeState_ == CloseState::OPEN) { // closeNow will skip draining the socket. onReadError doesn't gets // triggered by retriable errors. If we are here, there is no point of // draining the socket. runOnEvbAsync([ex](auto self) { auto clientPtr = dynamic_cast(self.get()); clientPtr->closeNow(QuicError( QuicErrorCode(LocalErrorCode::CONNECTION_ABANDONED), std::string(ex.what()))); }); } } void QuicClientTransportLite::getReadBuffer( void** /* buf */, size_t* /* len */) noexcept { folly::terminate_with("getReadBuffer unsupported"); } void QuicClientTransportLite::onDataAvailable( const folly::SocketAddress& /* server */, size_t /* len */, bool /* truncated */, OnDataAvailableParams /* params */) noexcept { folly::terminate_with("onDataAvailable unsupported"); } bool QuicClientTransportLite::shouldOnlyNotify() { return true; } void QuicClientTransportLite::recvMsg( QuicAsyncUDPSocket& sock, uint64_t readBufferSize, int numPackets, NetworkData& networkData, Optional& server, size_t& totalData) { for (int packetNum = 0; packetNum < numPackets; ++packetNum) { // We create 1 buffer per packet so that it is not shared, this enables // us to decrypt in place. If the fizz decrypt api could decrypt in-place // even if shared, then we could allocate one giant IOBuf here. Buf readBuffer = BufHelpers::createCombined(readBufferSize); struct iovec vec; vec.iov_base = readBuffer->writableData(); vec.iov_len = readBufferSize; sockaddr* rawAddr{nullptr}; struct sockaddr_storage addrStorage {}; if (!server) { rawAddr = reinterpret_cast(&addrStorage); rawAddr->sa_family = sock.getLocalAddressFamily(); } int flags = 0; QuicAsyncUDPSocket::ReadCallback::OnDataAvailableParams params; struct msghdr msg {}; msg.msg_name = rawAddr; msg.msg_namelen = rawAddr ? kAddrLen : 0; msg.msg_iov = &vec; msg.msg_iovlen = 1; #ifdef FOLLY_HAVE_MSG_ERRQUEUE bool useGRO = sock.getGRO() > 0; bool useTs = sock.getTimestamping() > 0; bool recvTos = sock.getRecvTos(); bool checkCmsgs = useGRO || useTs || recvTos; char control [QuicAsyncUDPSocket::ReadCallback::OnDataAvailableParams::kCmsgSpace] = {}; if (checkCmsgs) { msg.msg_control = control; msg.msg_controllen = sizeof(control); // we need to consider MSG_TRUNC too flags |= MSG_TRUNC; } #endif ssize_t ret = sock.recvmsg(&msg, flags); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // If we got a retriable error, let us continue. if (conn_->loopDetectorCallback) { conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR; } break; } // If we got a non-retriable error, we might have received // a packet that we could process, however let's just quit early. sock.pauseRead(); if (conn_->loopDetectorCallback) { conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR; } return onReadError(folly::AsyncSocketException( folly::AsyncSocketException::INTERNAL_ERROR, "::recvmsg() failed", errno)); } else if (ret == 0) { break; } #ifdef FOLLY_HAVE_MSG_ERRQUEUE if (checkCmsgs) { QuicAsyncUDPSocket::fromMsg(params, msg); // truncated if ((size_t)ret > readBufferSize) { ret = readBufferSize; if (params.gro > 0) { ret = ret - ret % params.gro; } } } #endif ReceivedUdpPacket::Timings timings; if (params.ts.has_value()) { timings.maybeSoftwareTs = QuicAsyncUDPSocket::convertToSocketTimestampExt(*params.ts); } size_t bytesRead = size_t(ret); totalData += bytesRead; if (!server) { server = folly::SocketAddress(); server->setFromSockaddr(rawAddr, kAddrLen); } VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead; readBuffer->append(bytesRead); if (params.gro > 0) { size_t len = bytesRead; size_t remaining = len; size_t offset = 0; size_t totalNumPackets = networkData.getPackets().size() + ((len + params.gro - 1) / params.gro); networkData.reserve(totalNumPackets); while (remaining) { if (static_cast(remaining) > params.gro) { auto tmp = readBuffer->cloneOne(); // start at offset tmp->trimStart(offset); // the actual len is len - offset now // leave gro bytes tmp->trimEnd(len - offset - params.gro); DCHECK_EQ(tmp->length(), params.gro); offset += params.gro; remaining -= params.gro; networkData.addPacket( ReceivedUdpPacket(std::move(tmp), timings, params.tos)); } else { // do not clone the last packet // start at offset, use all the remaining data readBuffer->trimStart(offset); DCHECK_EQ(readBuffer->length(), remaining); remaining = 0; networkData.addPacket( ReceivedUdpPacket(std::move(readBuffer), timings, params.tos)); } } } else { networkData.addPacket( ReceivedUdpPacket(std::move(readBuffer), timings, params.tos)); } maybeQlogDatagram(bytesRead); } trackDatagramsReceived( networkData.getPackets().size(), networkData.getTotalData()); } void QuicClientTransportLite::recvFrom( QuicAsyncUDPSocket& sock, uint64_t readBufferSize, int numPackets, NetworkData& networkData, Optional& server, size_t& totalData) { for (int packetNum = 0; packetNum < numPackets; ++packetNum) { // We create 1 buffer per packet so that it is not shared, this enables // us to decrypt in place. If the fizz decrypt api could decrypt in-place // even if shared, then we could allocate one giant IOBuf here. Buf readBuffer = BufHelpers::createCombined(readBufferSize); sockaddr* rawAddr{nullptr}; struct sockaddr_storage addrStorage {}; if (!server) { rawAddr = reinterpret_cast(&addrStorage); rawAddr->sa_family = sock.getLocalAddressFamily(); } ssize_t ret = sock.recvfrom(readBuffer->writableData(), readBufferSize, &addrStorage); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // If we got a retriable error, let us continue. if (conn_->loopDetectorCallback) { conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR; } break; } // If we got a non-retriable error, we might have received // a packet that we could process, however let's just quit early. sock.pauseRead(); if (conn_->loopDetectorCallback) { conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR; } return onReadError(folly::AsyncSocketException( folly::AsyncSocketException::INTERNAL_ERROR, "::recvmsg() failed", errno)); } else if (ret == 0) { break; } size_t bytesRead = size_t(ret); totalData += bytesRead; if (!server) { server = folly::SocketAddress(); server->setFromSockaddr(rawAddr, kAddrLen); } VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead; readBuffer->append(bytesRead); networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer))); } trackDatagramsReceived( networkData.getPackets().size(), networkData.getTotalData()); } void QuicClientTransportLite::recvMmsg( QuicAsyncUDPSocket& sock, uint64_t readBufferSize, uint16_t numPackets, NetworkData& networkData, Optional& server, size_t& totalData) { auto& msgs = recvmmsgStorage_.msgs; int flags = 0; #ifdef FOLLY_HAVE_MSG_ERRQUEUE bool useGRO = sock.getGRO() > 0; bool useTs = sock.getTimestamping() > 0; bool recvTos = sock.getRecvTos(); bool checkCmsgs = useGRO || useTs || recvTos; std::vector> controlVec(checkCmsgs ? numPackets : 0); // we need to consider MSG_TRUNC too if (useGRO) { flags |= MSG_TRUNC; } #endif for (uint16_t i = 0; i < numPackets; ++i) { auto& addr = recvmmsgStorage_.impl_[i].addr; auto& readBuffer = recvmmsgStorage_.impl_[i].readBuffer; auto& iovec = recvmmsgStorage_.impl_[i].iovec; struct msghdr* msg = &msgs[i].msg_hdr; if (!readBuffer) { readBuffer = BufHelpers::createCombined(readBufferSize); iovec.iov_base = readBuffer->writableData(); iovec.iov_len = readBufferSize; msg->msg_iov = &iovec; msg->msg_iovlen = 1; } CHECK(readBuffer != nullptr); auto* rawAddr = reinterpret_cast(&addr); rawAddr->sa_family = sock.address().getFamily(); msg->msg_name = rawAddr; msg->msg_namelen = kAddrLen; #ifdef FOLLY_HAVE_MSG_ERRQUEUE if (checkCmsgs) { ::memset(controlVec[i].data(), 0, controlVec[i].size()); msg->msg_control = controlVec[i].data(); msg->msg_controllen = controlVec[i].size(); } #endif } int numMsgsRecvd = sock.recvmmsg(msgs.data(), numPackets, flags, nullptr); if (numMsgsRecvd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // Exit, socket will notify us again when socket is readable. if (conn_->loopDetectorCallback) { conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR; } return; } // If we got a non-retriable error, we might have received // a packet that we could process, however let's just quit early. sock.pauseRead(); if (conn_->loopDetectorCallback) { conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR; } return onReadError(folly::AsyncSocketException( folly::AsyncSocketException::INTERNAL_ERROR, "::recvmmsg() failed", errno)); } CHECK_LE(numMsgsRecvd, numPackets); for (uint16_t i = 0; i < static_cast(numMsgsRecvd); ++i) { auto& addr = recvmmsgStorage_.impl_[i].addr; auto& readBuffer = recvmmsgStorage_.impl_[i].readBuffer; auto& msg = msgs[i]; size_t bytesRead = msg.msg_len; if (bytesRead == 0) { // Empty datagram, this is probably garbage matching our tuple, we // should ignore such datagrams. continue; } QuicAsyncUDPSocket::ReadCallback::OnDataAvailableParams params; #ifdef FOLLY_HAVE_MSG_ERRQUEUE if (checkCmsgs) { QuicAsyncUDPSocket::fromMsg(params, msg.msg_hdr); // truncated if (bytesRead > readBufferSize) { bytesRead = readBufferSize; if (params.gro > 0) { bytesRead = bytesRead - bytesRead % params.gro; } } } #endif totalData += bytesRead; if (!server) { server.emplace(folly::SocketAddress()); auto* rawAddr = reinterpret_cast(&addr); server->setFromSockaddr(rawAddr, kAddrLen); } ReceivedUdpPacket::Timings timings; if (params.ts.has_value()) { timings.maybeSoftwareTs = QuicAsyncUDPSocket::convertToSocketTimestampExt(*params.ts); } VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead; readBuffer->append(bytesRead); if (params.gro > 0) { size_t len = bytesRead; size_t remaining = len; size_t offset = 0; size_t totalNumPackets = networkData.getPackets().size() + ((len + params.gro - 1) / params.gro); networkData.reserve(totalNumPackets); while (remaining) { if (static_cast(remaining) > params.gro) { auto tmp = readBuffer->cloneOne(); // start at offset tmp->trimStart(offset); // the actual len is len - offset now // leave gro bytes tmp->trimEnd(len - offset - params.gro); DCHECK_EQ(tmp->length(), params.gro); offset += params.gro; remaining -= params.gro; networkData.addPacket( ReceivedUdpPacket(std::move(tmp), timings, params.tos)); } else { // do not clone the last packet // start at offset, use all the remaining data readBuffer->trimStart(offset); DCHECK_EQ(readBuffer->length(), remaining); remaining = 0; networkData.addPacket( ReceivedUdpPacket(std::move(readBuffer), timings, params.tos)); } } } else { networkData.addPacket( ReceivedUdpPacket(std::move(readBuffer), timings, params.tos)); } maybeQlogDatagram(bytesRead); } trackDatagramsReceived( networkData.getPackets().size(), networkData.getTotalData()); } void QuicClientTransportLite::processPackets( NetworkData&& networkData, const Optional& server) { if (networkData.getPackets().empty()) { // recvMmsg and recvMsg might have already set the reason and counter if (conn_->loopDetectorCallback) { if (conn_->readDebugState.noReadReason == NoReadReason::READ_OK) { conn_->readDebugState.noReadReason = NoReadReason::EMPTY_DATA; } if (conn_->readDebugState.noReadReason != NoReadReason::READ_OK) { conn_->loopDetectorCallback->onSuspiciousReadLoops( ++conn_->readDebugState.loopCount, conn_->readDebugState.noReadReason); } } return; } DCHECK(server.has_value()); // TODO: we can get better receive time accuracy than this, with // SO_TIMESTAMP or SIOCGSTAMP. auto packetReceiveTime = Clock::now(); networkData.setReceiveTimePoint(packetReceiveTime); onNetworkData(*server, std::move(networkData)); } void QuicClientTransportLite::readWithRecvmsgSinglePacketLoop( QuicAsyncUDPSocket& sock, uint64_t readBufferSize) { size_t totalData = 0; Optional server; for (size_t i = 0; i < conn_->transportSettings.maxRecvBatchSize; i++) { auto networkDataSinglePacket = NetworkData(); networkDataSinglePacket.reserve(1); recvMsg( sock, readBufferSize, 1 /* numPackets */, networkDataSinglePacket, server, totalData); if (!socket_) { // Socket has been closed. return; } if (networkDataSinglePacket.getPackets().size() == 0) { break; } processPackets(std::move(networkDataSinglePacket), server); if (!socket_) { // Socket has been closed. return; } } // Call callbacks/updates manually because processPackets()/onNetworkData() // will not schedule it when transportSettings.networkDataPerSocketRead is on. processCallbacksAfterNetworkData(); checkForClosedStream(); updateReadLooper(); updateWriteLooper(true); } void QuicClientTransportLite::onNotifyDataAvailable( QuicAsyncUDPSocket& sock) noexcept { auto self = this->shared_from_this(); CHECK(conn_) << "trying to receive packets without a connection"; auto readBufferSize = conn_->transportSettings.maxRecvPacketSize * numGROBuffers_; const size_t readAllocSize = conn_->transportSettings.readCoalescingSize > kDefaultUDPSendPacketLen ? conn_->transportSettings.readCoalescingSize : readBufferSize; readWithRecvmsgSinglePacketLoop(sock, readAllocSize); } void QuicClientTransportLite:: happyEyeballsConnAttemptDelayTimeoutExpired() noexcept { // Declare 0-RTT data as lost so that they will be retransmitted over the // second socket. happyEyeballsStartSecondSocket(clientConn_->happyEyeballsState); // If this gets called from the write path then we haven't added the packets // to the outstanding packet list yet. runOnEvbAsync([&](auto) { auto result = markZeroRttPacketsLost(*conn_, markPacketLoss); LOG_IF(ERROR, result.hasError()) << "Failed to mark 0-RTT packets as lost."; }); } void QuicClientTransportLite::start( ConnectionSetupCallback* connSetupCb, ConnectionCallback* connCb) { if (happyEyeballsEnabled_) { // TODO Supply v4 delay amount from somewhere when we want to tune this startHappyEyeballs( *clientConn_, evb_.get(), happyEyeballsCachedFamily_, happyEyeballsConnAttemptDelayTimeout_, happyEyeballsCachedFamily_ == AF_UNSPEC ? kHappyEyeballsV4Delay : kHappyEyeballsConnAttemptDelayWithCache, this, this, socketOptions_); } CHECK(conn_->peerAddress.isInitialized()); if (conn_->qLogger) { conn_->qLogger->addTransportStateUpdate(kStart); } setConnectionSetupCallback(connSetupCb); setConnectionCallback(connCb); clientConn_->pendingOneRttData.reserve( conn_->transportSettings.maxPacketsToBuffer); try { happyEyeballsSetUpSocket( *socket_, conn_->localAddress, conn_->peerAddress, conn_->transportSettings, conn_->socketTos.value, this, this, socketOptions_); // adjust the GRO buffers adjustGROBuffers(); auto handshakeResult = startCryptoHandshake(); if (handshakeResult.hasError()) { runOnEvbAsync([error = handshakeResult.error()](auto self) { auto clientPtr = dynamic_cast(self.get()); clientPtr->closeImpl(error); }); } } catch (const QuicTransportException& ex) { runOnEvbAsync([ex](auto self) { auto clientPtr = dynamic_cast(self.get()); clientPtr->closeImpl( QuicError(QuicErrorCode(ex.errorCode()), std::string(ex.what()))); }); } catch (const QuicInternalException& ex) { runOnEvbAsync([ex](auto self) { auto clientPtr = dynamic_cast(self.get()); clientPtr->closeImpl( QuicError(QuicErrorCode(ex.errorCode()), std::string(ex.what()))); }); } catch (const std::exception& ex) { LOG(ERROR) << "Connect failed " << ex.what(); runOnEvbAsync([ex](auto self) { auto clientPtr = dynamic_cast(self.get()); clientPtr->closeImpl(QuicError( QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), std::string(ex.what()))); }); } } void QuicClientTransportLite::addNewPeerAddress( folly::SocketAddress peerAddress) { CHECK(peerAddress.isInitialized()); if (happyEyeballsEnabled_) { conn_->udpSendPacketLen = std::min( conn_->udpSendPacketLen, (peerAddress.getFamily() == AF_INET6 ? kDefaultV6UDPSendPacketLen : kDefaultV4UDPSendPacketLen)); happyEyeballsAddPeerAddress(*clientConn_, peerAddress); return; } conn_->udpSendPacketLen = peerAddress.getFamily() == AF_INET6 ? kDefaultV6UDPSendPacketLen : kDefaultV4UDPSendPacketLen; conn_->originalPeerAddress = peerAddress; conn_->peerAddress = std::move(peerAddress); } void QuicClientTransportLite::setLocalAddress( folly::SocketAddress localAddress) { CHECK(localAddress.isInitialized()); conn_->localAddress = std::move(localAddress); } void QuicClientTransportLite::setHappyEyeballsEnabled( bool happyEyeballsEnabled) { happyEyeballsEnabled_ = happyEyeballsEnabled; } void QuicClientTransportLite::setHappyEyeballsCachedFamily( sa_family_t cachedFamily) { happyEyeballsCachedFamily_ = cachedFamily; } void QuicClientTransportLite::addNewSocket( std::unique_ptr socket) { happyEyeballsAddSocket(*clientConn_, std::move(socket)); } void QuicClientTransportLite::setHostname(const std::string& hostname) { hostname_ = hostname; } void QuicClientTransportLite::setSelfOwning() { selfOwning_ = shared_from_this(); } void QuicClientTransportLite::adjustGROBuffers() { if (socket_ && conn_) { if (conn_->transportSettings.numGROBuffers_ > kDefaultNumGROBuffers) { socket_->setGRO(true); auto ret = socket_->getGRO(); if (ret > 0) { numGROBuffers_ = (conn_->transportSettings.numGROBuffers_ < kMaxNumGROBuffers) ? conn_->transportSettings.numGROBuffers_ : kMaxNumGROBuffers; } } } } void QuicClientTransportLite::closeTransport() { cancelTimeout(&happyEyeballsConnAttemptDelayTimeout_); } void QuicClientTransportLite::unbindConnection() { selfOwning_ = nullptr; } void QuicClientTransportLite::setSupportedVersions( const std::vector& versions) { auto version = versions.at(0); conn_->originalVersion = version; auto params = conn_->readCodec->getCodecParameters(); params.version = conn_->originalVersion.value(); conn_->readCodec->setCodecParameters(params); } void QuicClientTransportLite::onNetworkSwitch( std::unique_ptr newSock) { if (!conn_->oneRttWriteCipher) { return; } if (socket_ && newSock) { auto sock = std::move(socket_); socket_ = nullptr; sock->setErrMessageCallback(nullptr); sock->pauseRead(); sock->close(); socket_ = std::move(newSock); socket_->setAdditionalCmsgsFunc( [&]() { return getAdditionalCmsgsForAsyncUDPSocket(); }); happyEyeballsSetUpSocket( *socket_, conn_->localAddress, conn_->peerAddress, conn_->transportSettings, conn_->socketTos.value, this, this, socketOptions_); if (conn_->qLogger) { conn_->qLogger->addConnectionMigrationUpdate(true); } // adjust the GRO buffers adjustGROBuffers(); } } void QuicClientTransportLite::setTransportStatsCallback( std::shared_ptr statsCallback) noexcept { CHECK(conn_); statsCallback_ = std::move(statsCallback); if (statsCallback_) { conn_->statsCallback = statsCallback_.get(); conn_->readCodec->setConnectionStatsCallback(statsCallback_.get()); } else { conn_->statsCallback = nullptr; } } void QuicClientTransportLite::maybeQlogDatagram(size_t len) { if (conn_->qLogger) { conn_->qLogger->addDatagramReceived(len); } } void QuicClientTransportLite::trackDatagramsReceived( uint32_t totalPackets, uint32_t totalPacketLen) { QUIC_STATS(statsCallback_, onPacketsReceived, totalPackets); QUIC_STATS(statsCallback_, onRead, totalPacketLen); } void QuicClientTransportLite::maybeSendTransportKnobs() { if (!transportKnobsSent_ && hasWriteCipher()) { for (const auto& knob : conn_->transportSettings.knobs) { auto res = setKnob(knob.space, knob.id, BufHelpers::copyBuffer(knob.blob)); if (res.hasError()) { if (res.error() != LocalErrorCode::KNOB_FRAME_UNSUPPORTED) { LOG(ERROR) << "Unexpected error while sending knob frames"; } // No point in keep trying if transport does not support knob frame break; } } transportKnobsSent_ = true; } } Optional> QuicClientTransportLite::getPeerTransportParams() const { if (clientConn_ && clientConn_->clientHandshakeLayer) { auto maybeParams = clientConn_->clientHandshakeLayer->getServerTransportParams(); if (maybeParams) { return maybeParams->parameters; } } return none; } void QuicClientTransportLite::setCongestionControl(CongestionControlType type) { if (!conn_->congestionControllerFactory) { // If you are hitting this, update your application to call // setCongestionControllerFactory() on the transport and share one factory // for all transports. conn_->congestionControllerFactory = std::make_shared(); LOG(WARNING) << "A congestion controller factory is not set. Using a default per-transport instance."; } QuicTransportBaseLite::setCongestionControl(type); } void QuicClientTransportLite::RecvmmsgStorage::resize(size_t numPackets) { if (msgs.size() != numPackets) { msgs.resize(numPackets); impl_.resize(numPackets); } } uint64_t QuicClientTransportLite::getNumAckFramesSent() const { return conn_->numAckFramesSent; } uint64_t QuicClientTransportLite::getNumFlowControlFramesSent() const { return conn_->numWindowUpdateFramesSent; } uint64_t QuicClientTransportLite::getNumPingFramesSent() const { return conn_->numPingFramesSent; } uint64_t QuicClientTransportLite::getEagainOrEwouldblockCount() const { return conn_->eagainOrEwouldblockCount; } uint64_t QuicClientTransportLite::getEnobufsCount() const { return conn_->enobufsCount; } uint64_t QuicClientTransportLite::getPtoCount() const { return conn_->lossState.ptoCount; } uint64_t QuicClientTransportLite::getPacketsSentCount() const { return conn_->lossState.totalPacketsSent; } bool QuicClientTransportLite::canRead() const { return socket_ && !socket_->isReadPaused(); } std::optional QuicClientTransportLite::getHandshakeStatus() const { return clientConn_->clientHandshakeLayer->getHandshakeStatus(); } size_t QuicClientTransportLite::getInitialReadBufferSize() const { return clientConn_->clientHandshakeLayer->getInitialReadBufferSize(); } size_t QuicClientTransportLite::getHandshakeReadBufferSize() const { return clientConn_->clientHandshakeLayer->getHandshakeReadBufferSize(); } size_t QuicClientTransportLite::getAppDataReadBufferSize() const { return clientConn_->clientHandshakeLayer->getAppDataReadBufferSize(); } EncryptionLevel QuicClientTransportLite::getReadEncryptionLevel() const { return clientConn_->clientHandshakeLayer->getReadRecordLayerEncryptionLevel(); } bool QuicClientTransportLite::waitingForHandshakeData() const { return clientConn_->clientHandshakeLayer->waitingForData(); } const std::shared_ptr QuicClientTransportLite::getPeerCertificate() const { const auto clientHandshakeLayer = clientConn_->clientHandshakeLayer; if (clientHandshakeLayer) { return clientHandshakeLayer->getPeerCertificate(); } return nullptr; } } // namespace quic