diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 6b56ddb2c..2ffc891ce 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -47,8 +47,10 @@ QuicTransportBase::QuicTransportBase( std::shared_ptr evb, std::unique_ptr socket, bool useConnectionEndWithErrorCallback) - : QuicTransportBaseLite(std::move(evb), useConnectionEndWithErrorCallback), - socket_(std::move(socket)), + : QuicTransportBaseLite( + std::move(evb), + std::move(socket), + useConnectionEndWithErrorCallback), lossTimeout_(this), ackTimeout_(this), pathValidationTimeout_(this), @@ -2968,116 +2970,6 @@ QuicTransportBase::readDatagramBufs(size_t atMost) { return retDatagrams; } -void QuicTransportBase::writeSocketData() { - if (socket_) { - ++(conn_->writeCount); // incremented on each write (or write attempt) - - // record current number of sent packets to detect delta - const auto beforeTotalBytesSent = conn_->lossState.totalBytesSent; - const auto beforeTotalPacketsSent = conn_->lossState.totalPacketsSent; - const auto beforeTotalAckElicitingPacketsSent = - conn_->lossState.totalAckElicitingPacketsSent; - const auto beforeNumOutstandingPackets = - conn_->outstandings.numOutstanding(); - - updatePacketProcessorsPrewriteRequests(); - - // if we're starting to write from app limited, notify observers - if (conn_->appLimitedTracker.isAppLimited() && - conn_->congestionController) { - conn_->appLimitedTracker.setNotAppLimited(); - notifyStartWritingFromAppRateLimited(); - } - writeData(); - if (closeState_ != CloseState::CLOSED) { - if (conn_->pendingEvents.closeTransport == true) { - throw QuicTransportException( - "Max packet number reached", - TransportErrorCode::PROTOCOL_VIOLATION); - } - setLossDetectionAlarm(*conn_, *this); - - // check for change in number of packets - const auto afterTotalBytesSent = conn_->lossState.totalBytesSent; - const auto afterTotalPacketsSent = conn_->lossState.totalPacketsSent; - const auto afterTotalAckElicitingPacketsSent = - conn_->lossState.totalAckElicitingPacketsSent; - const auto afterNumOutstandingPackets = - conn_->outstandings.numOutstanding(); - CHECK_LE(beforeTotalPacketsSent, afterTotalPacketsSent); - CHECK_LE( - beforeTotalAckElicitingPacketsSent, - afterTotalAckElicitingPacketsSent); - CHECK_LE(beforeNumOutstandingPackets, afterNumOutstandingPackets); - CHECK_EQ( - afterNumOutstandingPackets - beforeNumOutstandingPackets, - afterTotalAckElicitingPacketsSent - - beforeTotalAckElicitingPacketsSent); - const bool newPackets = (afterTotalPacketsSent > beforeTotalPacketsSent); - const bool newOutstandingPackets = - (afterTotalAckElicitingPacketsSent > - beforeTotalAckElicitingPacketsSent); - - // if packets sent, notify observers - if (newPackets) { - notifyPacketsWritten( - afterTotalPacketsSent - beforeTotalPacketsSent - /* numPacketsWritten */, - afterTotalAckElicitingPacketsSent - - beforeTotalAckElicitingPacketsSent - /* numAckElicitingPacketsWritten */, - afterTotalBytesSent - beforeTotalBytesSent /* numBytesWritten */); - } - if (conn_->loopDetectorCallback && newOutstandingPackets) { - conn_->writeDebugState.currentEmptyLoopCount = 0; - } else if ( - conn_->writeDebugState.needsWriteLoopDetect && - conn_->loopDetectorCallback) { - // TODO: Currently we will to get some stats first. Then we may filter - // out some errors here. For example, socket fail to write might be a - // legit case to filter out. - conn_->loopDetectorCallback->onSuspiciousWriteLoops( - ++conn_->writeDebugState.currentEmptyLoopCount, - conn_->writeDebugState.writeDataReason, - conn_->writeDebugState.noWriteReason, - conn_->writeDebugState.schedulerName); - } - // If we sent a new packet and the new packet was either the first - // packet after quiescence or after receiving a new packet. - if (newOutstandingPackets && - (beforeNumOutstandingPackets == 0 || - conn_->receivedNewPacketBeforeWrite)) { - // Reset the idle timer because we sent some data. - setIdleTimer(); - conn_->receivedNewPacketBeforeWrite = false; - } - // Check if we are app-limited after finish this round of sending - auto currentSendBufLen = conn_->flowControlState.sumCurStreamBufferLen; - auto lossBufferEmpty = !conn_->streamManager->hasLoss() && - conn_->cryptoState->initialStream.lossBuffer.empty() && - conn_->cryptoState->handshakeStream.lossBuffer.empty() && - conn_->cryptoState->oneRttStream.lossBuffer.empty(); - if (conn_->congestionController && - currentSendBufLen < conn_->udpSendPacketLen && lossBufferEmpty && - conn_->congestionController->getWritableBytes()) { - conn_->congestionController->setAppLimited(); - // notify via connection call and any observer callbacks - if (transportReadyNotified_ && connCallback_) { - connCallback_->onAppRateLimited(); - } - conn_->appLimitedTracker.setAppLimited(); - notifyAppRateLimited(); - } - } - } - // Writing data could write out an ack which could cause us to cancel - // the ack timer. But we need to call scheduleAckTimeout() for it to take - // effect. - scheduleAckTimeout(); - schedulePathValidationTimeout(); - updateWriteLooper(false); -} - void QuicTransportBase::writeSocketDataAndCatch() { [[maybe_unused]] auto self = sharedGuard(); try { @@ -3767,24 +3659,6 @@ QuicTransportBase::setStreamGroupRetransmissionPolicy( return folly::unit; } -void QuicTransportBase::updatePacketProcessorsPrewriteRequests() { - folly::SocketCmsgMap cmsgs; - for (const auto& pp : conn_->packetProcessors) { - // In case of overlapping cmsg keys, the priority is given to - // that were added to the QuicSocket first. - auto writeRequest = pp->prewrite(); - if (writeRequest && writeRequest->cmsgs) { - cmsgs.insert(writeRequest->cmsgs->begin(), writeRequest->cmsgs->end()); - } - } - if (!cmsgs.empty()) { - conn_->socketCmsgsState.additionalCmsgs = cmsgs; - } else { - conn_->socketCmsgsState.additionalCmsgs.reset(); - } - conn_->socketCmsgsState.targetWriteCount = conn_->writeCount; -} - void QuicTransportBase::validateECNState() { if (conn_->ecnState == ECNState::NotAttempted || conn_->ecnState == ECNState::FailedValidation) { diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 3b8d350e6..36f97ce73 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -269,14 +269,6 @@ class QuicTransportBase : public QuicSocket, const folly::SocketAddress& peer, ReceivedUdpPacket&& udpPacket) = 0; - /** - * Invoked when we have to write some data to the wire. - * The subclass may use this to start writing data to the socket. - * It may also throw an exception in case of an error in which case the - * connection will be closed. - */ - virtual void writeData() = 0; - /** * closeTransport is invoked on the sub-class when the transport is closed. * The sub-class may clean up any state during this call. The transport @@ -572,9 +564,9 @@ class QuicTransportBase : public QuicSocket, QuicTransportBase* transport_; }; - void scheduleLossTimeout(std::chrono::milliseconds timeout); - void cancelLossTimeout(); - bool isLossTimeoutScheduled(); // TODO: make this const again + void scheduleLossTimeout(std::chrono::milliseconds timeout) override; + void cancelLossTimeout() override; + bool isLossTimeoutScheduled() override; // TODO: make this const again // If you don't set it, the default is Cubic void setCongestionControl(CongestionControlType type) override; @@ -666,7 +658,7 @@ class QuicTransportBase : public QuicSocket, void invokeStreamsAvailableCallbacks(); void updateReadLooper(); void updatePeekLooper(); - void updateWriteLooper(bool thisIteration, bool runInline = false); + void updateWriteLooper(bool thisIteration, bool runInline = false) override; void handlePingCallbacks(); void handleKnobCallbacks(); void handleAckEventCallbacks(); @@ -714,16 +706,6 @@ class QuicTransportBase : public QuicSocket, bool bidirectional, const OptionalIntegral& streamGroupId = std::nullopt); - /** - * write data to socket - * - * At transport layer, this is the simplest form of write. It writes data - * out to the network, and schedule necessary timers (ack, idle, loss). It is - * both pacing oblivious and writeLooper oblivious. Caller needs to explicitly - * invoke updateWriteLooper afterwards if that's desired. - */ - void writeSocketData(); - /** * A wrapper around writeSocketData * @@ -754,9 +736,9 @@ class QuicTransportBase : public QuicSocket, void pingTimeoutExpired() noexcept; void excessWriteTimeoutExpired() noexcept; - void setIdleTimer(); - void scheduleAckTimeout(); - void schedulePathValidationTimeout(); + void setIdleTimer() override; + void scheduleAckTimeout() override; + void schedulePathValidationTimeout() override; void schedulePingTimeout( PingCallback* callback, std::chrono::milliseconds pingTimeout); @@ -765,12 +747,12 @@ class QuicTransportBase : public QuicSocket, // Helpers to notify all registered observers about specific events during // socket write (if enabled in the observer's config). - void notifyStartWritingFromAppRateLimited(); + void notifyStartWritingFromAppRateLimited() override; void notifyPacketsWritten( const uint64_t numPacketsWritten, const uint64_t numAckElicitingPacketsWritten, - const uint64_t numBytesWritten); - void notifyAppRateLimited(); + const uint64_t numBytesWritten) override; + void notifyAppRateLimited() override; /** * Callback when we receive a transport knob @@ -814,8 +796,6 @@ class QuicTransportBase : public QuicSocket, */ Optional getAdditionalCmsgsForAsyncUDPSocket(); - std::unique_ptr socket_; - struct ReadCallbackData { ReadCallback* readCb; bool resumed{true}; @@ -842,7 +822,6 @@ class QuicTransportBase : public QuicSocket, QuicSocket::WriteCallback* connWriteCallback_{nullptr}; std::map pendingWriteCallbacks_; - bool transportReadyNotified_{false}; bool handshakeDoneNotified_{false}; LossTimeout lossTimeout_; @@ -991,14 +970,6 @@ class QuicTransportBase : public QuicSocket, * enabled, i.e. advertisedMaxStreamGroups in transport settings is > 0. */ [[nodiscard]] bool checkCustomRetransmissionProfilesEnabled() const; - - /** - * Helper function to collect prewrite requests from the PacketProcessors - * Currently this collects cmsgs to be written. The Cmsgs will be stored in - * the connection state and passed to AsyncUDPSocket in the next - * additionalCmsgs callback - */ - void updatePacketProcessorsPrewriteRequests(); }; std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt); diff --git a/quic/api/QuicTransportBaseLite.cpp b/quic/api/QuicTransportBaseLite.cpp index e8a919679..816840660 100644 --- a/quic/api/QuicTransportBaseLite.cpp +++ b/quic/api/QuicTransportBaseLite.cpp @@ -5,9 +5,11 @@ * LICENSE file in the root directory of this source tree. */ +#include #include #include #include +#include #include namespace { @@ -119,6 +121,116 @@ QuicTransportBaseLite::getStreamFlowControl(StreamId id) const { stream->flowControlState.advertisedMaxOffset); } +void QuicTransportBaseLite::writeSocketData() { + if (socket_) { + ++(conn_->writeCount); // incremented on each write (or write attempt) + + // record current number of sent packets to detect delta + const auto beforeTotalBytesSent = conn_->lossState.totalBytesSent; + const auto beforeTotalPacketsSent = conn_->lossState.totalPacketsSent; + const auto beforeTotalAckElicitingPacketsSent = + conn_->lossState.totalAckElicitingPacketsSent; + const auto beforeNumOutstandingPackets = + conn_->outstandings.numOutstanding(); + + updatePacketProcessorsPrewriteRequests(); + + // if we're starting to write from app limited, notify observers + if (conn_->appLimitedTracker.isAppLimited() && + conn_->congestionController) { + conn_->appLimitedTracker.setNotAppLimited(); + notifyStartWritingFromAppRateLimited(); + } + writeData(); + if (closeState_ != CloseState::CLOSED) { + if (conn_->pendingEvents.closeTransport == true) { + throw QuicTransportException( + "Max packet number reached", + TransportErrorCode::PROTOCOL_VIOLATION); + } + setLossDetectionAlarm(*conn_, *this); + + // check for change in number of packets + const auto afterTotalBytesSent = conn_->lossState.totalBytesSent; + const auto afterTotalPacketsSent = conn_->lossState.totalPacketsSent; + const auto afterTotalAckElicitingPacketsSent = + conn_->lossState.totalAckElicitingPacketsSent; + const auto afterNumOutstandingPackets = + conn_->outstandings.numOutstanding(); + CHECK_LE(beforeTotalPacketsSent, afterTotalPacketsSent); + CHECK_LE( + beforeTotalAckElicitingPacketsSent, + afterTotalAckElicitingPacketsSent); + CHECK_LE(beforeNumOutstandingPackets, afterNumOutstandingPackets); + CHECK_EQ( + afterNumOutstandingPackets - beforeNumOutstandingPackets, + afterTotalAckElicitingPacketsSent - + beforeTotalAckElicitingPacketsSent); + const bool newPackets = (afterTotalPacketsSent > beforeTotalPacketsSent); + const bool newOutstandingPackets = + (afterTotalAckElicitingPacketsSent > + beforeTotalAckElicitingPacketsSent); + + // if packets sent, notify observers + if (newPackets) { + notifyPacketsWritten( + afterTotalPacketsSent - beforeTotalPacketsSent + /* numPacketsWritten */, + afterTotalAckElicitingPacketsSent - + beforeTotalAckElicitingPacketsSent + /* numAckElicitingPacketsWritten */, + afterTotalBytesSent - beforeTotalBytesSent /* numBytesWritten */); + } + if (conn_->loopDetectorCallback && newOutstandingPackets) { + conn_->writeDebugState.currentEmptyLoopCount = 0; + } else if ( + conn_->writeDebugState.needsWriteLoopDetect && + conn_->loopDetectorCallback) { + // TODO: Currently we will to get some stats first. Then we may filter + // out some errors here. For example, socket fail to write might be a + // legit case to filter out. + conn_->loopDetectorCallback->onSuspiciousWriteLoops( + ++conn_->writeDebugState.currentEmptyLoopCount, + conn_->writeDebugState.writeDataReason, + conn_->writeDebugState.noWriteReason, + conn_->writeDebugState.schedulerName); + } + // If we sent a new packet and the new packet was either the first + // packet after quiescence or after receiving a new packet. + if (newOutstandingPackets && + (beforeNumOutstandingPackets == 0 || + conn_->receivedNewPacketBeforeWrite)) { + // Reset the idle timer because we sent some data. + setIdleTimer(); + conn_->receivedNewPacketBeforeWrite = false; + } + // Check if we are app-limited after finish this round of sending + auto currentSendBufLen = conn_->flowControlState.sumCurStreamBufferLen; + auto lossBufferEmpty = !conn_->streamManager->hasLoss() && + conn_->cryptoState->initialStream.lossBuffer.empty() && + conn_->cryptoState->handshakeStream.lossBuffer.empty() && + conn_->cryptoState->oneRttStream.lossBuffer.empty(); + if (conn_->congestionController && + currentSendBufLen < conn_->udpSendPacketLen && lossBufferEmpty && + conn_->congestionController->getWritableBytes()) { + conn_->congestionController->setAppLimited(); + // notify via connection call and any observer callbacks + if (transportReadyNotified_ && connCallback_) { + connCallback_->onAppRateLimited(); + } + conn_->appLimitedTracker.setAppLimited(); + notifyAppRateLimited(); + } + } + } + // Writing data could write out an ack which could cause us to cancel + // the ack timer. But we need to call scheduleAckTimeout() for it to take + // effect. + scheduleAckTimeout(); + schedulePathValidationTimeout(); + updateWriteLooper(false); +} + bool QuicTransportBaseLite::processCancelCode(const QuicError& cancelCode) { bool noError = false; switch (cancelCode.code.type()) { @@ -156,4 +268,22 @@ uint64_t QuicTransportBaseLite::maxWritableOnConn() const { return ret; } +void QuicTransportBaseLite::updatePacketProcessorsPrewriteRequests() { + folly::SocketCmsgMap cmsgs; + for (const auto& pp : conn_->packetProcessors) { + // In case of overlapping cmsg keys, the priority is given to + // that were added to the QuicSocket first. + auto writeRequest = pp->prewrite(); + if (writeRequest && writeRequest->cmsgs) { + cmsgs.insert(writeRequest->cmsgs->begin(), writeRequest->cmsgs->end()); + } + } + if (!cmsgs.empty()) { + conn_->socketCmsgsState.additionalCmsgs = cmsgs; + } else { + conn_->socketCmsgsState.additionalCmsgs.reset(); + } + conn_->socketCmsgsState.targetWriteCount = conn_->writeCount; +} + } // namespace quic diff --git a/quic/api/QuicTransportBaseLite.h b/quic/api/QuicTransportBaseLite.h index 42b8b845d..4ef5e8acc 100644 --- a/quic/api/QuicTransportBaseLite.h +++ b/quic/api/QuicTransportBaseLite.h @@ -17,10 +17,20 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { public: QuicTransportBaseLite( std::shared_ptr evb, + std::unique_ptr socket, bool useConnectionEndWithErrorCallback) : evb_(evb), + socket_(std::move(socket)), useConnectionEndWithErrorCallback_(useConnectionEndWithErrorCallback) {} + /** + * Invoked when we have to write some data to the wire. + * The subclass may use this to start writing data to the socket. + * It may also throw an exception in case of an error in which case the + * connection will be closed. + */ + virtual void writeData() = 0; + bool good() const override; bool error() const override; @@ -64,7 +74,71 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { [[nodiscard]] uint64_t maxWritableOnConn() const override; + virtual void scheduleLossTimeout(std::chrono::milliseconds /* timeout */) { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + virtual void cancelLossTimeout() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + virtual bool isLossTimeoutScheduled() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + return false; + } + protected: + /** + * write data to socket + * + * At transport layer, this is the simplest form of write. It writes data + * out to the network, and schedule necessary timers (ack, idle, loss). It is + * both pacing oblivious and writeLooper oblivious. Caller needs to explicitly + * invoke updateWriteLooper afterwards if that's desired. + */ + void writeSocketData(); + + virtual void updateWriteLooper( + bool /* thisIteration */, + bool /* runInline */ = false) { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + // Helpers to notify all registered observers about specific events during + // socket write (if enabled in the observer's config). + virtual void notifyStartWritingFromAppRateLimited() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + virtual void notifyPacketsWritten( + const uint64_t /* numPacketsWritten */, + const uint64_t /* numAckElicitingPacketsWritten */, + const uint64_t /* numBytesWritten */) { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + virtual void notifyAppRateLimited() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + virtual void setIdleTimer() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + virtual void scheduleAckTimeout() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + virtual void schedulePathValidationTimeout() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + void resetConnectionCallbacks() { connSetupCallback_ = nullptr; connCallback_ = nullptr; @@ -76,6 +150,7 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { void processConnectionCallbacks(QuicError&& cancelCode); std::shared_ptr evb_; + std::unique_ptr socket_; CloseState closeState_{CloseState::OPEN}; @@ -84,9 +159,20 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { // A flag telling transport if the new onConnectionEnd(error) cb must be used. bool useConnectionEndWithErrorCallback_{false}; + bool transportReadyNotified_{false}; + std:: unique_ptr conn_; + + private: + /** + * Helper function to collect prewrite requests from the PacketProcessors + * Currently this collects cmsgs to be written. The Cmsgs will be stored in + * the connection state and passed to AsyncUDPSocket in the next + * additionalCmsgs callback + */ + void updatePacketProcessorsPrewriteRequests(); }; } // namespace quic diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index bec5966ff..a92568b70 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -588,7 +588,8 @@ class TestQuicTransport return none; } - void updateWriteLooper(bool thisIteration) { + void updateWriteLooper(bool thisIteration, bool /* runInline */ = false) + override { QuicTransportBase::updateWriteLooper(thisIteration); } diff --git a/quic/api/test/TestQuicTransport.h b/quic/api/test/TestQuicTransport.h index 3899db87a..5f3bfa37c 100644 --- a/quic/api/test/TestQuicTransport.h +++ b/quic/api/test/TestQuicTransport.h @@ -74,7 +74,8 @@ class TestQuicTransport return conn.version.value_or(*conn.originalVersion); } - void updateWriteLooper(bool thisIteration) { + void updateWriteLooper(bool thisIteration, bool /* runInline */ = false) + override { QuicTransportBase::updateWriteLooper(thisIteration); }