diff --git a/CMakeLists.txt b/CMakeLists.txt index 8673c3f07..ad0546849 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,16 +45,6 @@ find_package(Fizz REQUIRED) find_package(Glog REQUIRED) find_package(Threads) -if(DEFINED CCP_ENABLED) - find_package(libccp REQUIRED) - set(LIBCCP_LIBRARY libccp::ccp_static) - # mvfst implementation of ccp is guarded with ifdef CCP_ENABLED - add_compile_definitions(CCP_ENABLED=${CCP_ENABLED}) - # libccp header files include extern c if __CPLUSPLUS__ is defined - add_compile_definitions(__CPLUSPLUS__=1) -endif() - - SET(GFLAG_DEPENDENCIES "") SET(QUIC_EXTRA_LINK_LIBRARIES "") SET(QUIC_EXTRA_INCLUDE_DIRECTORIES "") diff --git a/build_helper.sh b/build_helper.sh index 04f6dc53f..718c11013 100755 --- a/build_helper.sh +++ b/build_helper.sh @@ -25,14 +25,13 @@ Usage ${0##*/} [-h|?] [-p PATH] [-i INSTALL_PREFIX] -m (optional): Build folly without jemalloc -s (optional): Skip installing system package dependencies -c (optional): Use ccache - -z (optional): enable CCP support -f (optional): Skip fetching dependencies (to test local changes) -h|? Show this help message EOF } FETCH_DEPENDENCIES=true -while getopts ":hp:i:msczf" arg; do +while getopts ":hp:i:mscf" arg; do case $arg in p) BUILD_DIR="${OPTARG}" @@ -49,9 +48,6 @@ while getopts ":hp:i:msczf" arg; do c) MVFST_USE_CCACHE=true ;; - z) - MVFST_ENABLE_CCP=true - ;; f) FETCH_DEPENDENCIES=false ;; @@ -88,12 +84,10 @@ mkdir -p "$MVFST_BUILD_DIR" if [ -z "${INSTALL_PREFIX-}" ]; then FOLLY_INSTALL_DIR=$DEPS_DIR FIZZ_INSTALL_DIR=$DEPS_DIR - LIBCCP_INSTALL_DIR=$DEPS_DIR MVFST_INSTALL_DIR=$BWD else FOLLY_INSTALL_DIR=$INSTALL_PREFIX FIZZ_INSTALL_DIR=$INSTALL_PREFIX - LIBCCP_INSTALL_DIR=$INSTALL_PREFIX MVFST_INSTALL_DIR=$INSTALL_PREFIX fi @@ -354,32 +348,6 @@ function detect_platform() { echo -e "${COLOR_GREEN}Detected platform: $Platform ${COLOR_OFF}" } -function setup_libccp() { - LIBCCP_DIR=$DEPS_DIR/libccp - LIBCCP_BUILD_DIR=$LIBCCP_DIR/build/ - if [ ! -d "$LIBCCP_DIR" ] ; then - echo -e "${COLOR_GREEN}[ INFO ] Cloning libccp repo ${COLOR_OFF}" - git clone https://github.com/ccp-project/libccp "$LIBCCP_DIR" - fi - - #synch_dependency_to_commit "$LIBCCP_DIR" "$MVFST_ROOT_DIR/build/deps/github_hashes/libccp-rev.txt" - - echo -e "${COLOR_GREEN}Building libccp ${COLOR_OFF}" - mkdir -p "$LIBCCP_BUILD_DIR" - cd "$LIBCCP_BUILD_DIR" || exit - cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo \ - -DBUILD_TESTS=OFF \ - -DCMAKE_PREFIX_PATH="$LIBCCP_INSTALL_DIR" \ - -DCMAKE_INSTALL_PREFIX="$LIBCCP_INSTALL_DIR" \ - -DCMAKE_CXX_FLAGS=\D__CPLUSPLUS__=1 \ - ${CMAKE_EXTRA_ARGS[@]+"${CMAKE_EXTRA_ARGS[@]}"} \ - "$LIBCCP_DIR" - make -j "$nproc" - make install - echo -e "${COLOR_GREEN}libccp is installed ${COLOR_OFF}" - cd "$BWD" || exit -} - function setup_rust() { if ! [ -x "$(command -v rustc)" ] || ! [ -x "$(command -v cargo)" ]; then echo -e "${COLOR_RED}[ ERROR ] Rust not found (required for CCP support).${COLOR_OFF}\n" @@ -398,10 +366,6 @@ setup_googletest setup_zstd setup_folly setup_fizz -if [[ -n "${MVFST_ENABLE_CCP-}" ]]; then - setup_libccp - setup_rust -fi # build mvfst: @@ -413,9 +377,6 @@ mvfst_cmake_build_args=( -DBUILD_TESTS=On \ ${CMAKE_EXTRA_ARGS[@]+"${CMAKE_EXTRA_ARGS[@]}"} \ ) -if [[ -n "${MVFST_ENABLE_CCP-}" ]]; then - mvfst_cmake_build_args+=(-DCCP_ENABLED=TRUE) -fi cmake "${mvfst_cmake_build_args[@]}" ../.. make -j "$nproc" diff --git a/quic/QuicConstants.cpp b/quic/QuicConstants.cpp index 6900a59f2..7efa626fb 100644 --- a/quic/QuicConstants.cpp +++ b/quic/QuicConstants.cpp @@ -26,8 +26,6 @@ std::string_view congestionControlTypeToString(CongestionControlType type) { return kCongestionControlCopa2Str; case CongestionControlType::NewReno: return kCongestionControlNewRenoStr; - case CongestionControlType::CCP: - return kCongestionControlCcpStr; case CongestionControlType::StaticCwnd: return kCongestionControlStaticCwndStr; case CongestionControlType::None: @@ -53,8 +51,6 @@ std::optional congestionControlStrToType( return quic::CongestionControlType::Copa2; } else if (str == kCongestionControlNewRenoStr) { return quic::CongestionControlType::NewReno; - } else if (str == kCongestionControlCcpStr) { - return quic::CongestionControlType::CCP; } else if (str == kCongestionControlStaticCwndStr) { return quic::CongestionControlType::StaticCwnd; } else if (str == kCongestionControlNoneStr) { diff --git a/quic/QuicConstants.h b/quic/QuicConstants.h index 54cfa926d..b6f71498d 100644 --- a/quic/QuicConstants.h +++ b/quic/QuicConstants.h @@ -392,7 +392,6 @@ constexpr std::string_view kCongestionControlBbrTestingStr = "bbr_testing"; constexpr std::string_view kCongestionControlCopaStr = "copa"; constexpr std::string_view kCongestionControlCopa2Str = "copa2"; constexpr std::string_view kCongestionControlNewRenoStr = "newreno"; -constexpr std::string_view kCongestionControlCcpStr = "ccp"; constexpr std::string_view kCongestionControlStaticCwndStr = "staticcwnd"; constexpr std::string_view kCongestionControlNoneStr = "none"; @@ -404,7 +403,6 @@ enum class CongestionControlType : uint8_t { Copa2, BBR, BBRTesting, - CCP, StaticCwnd, None, // NOTE: MAX should always be at the end diff --git a/quic/congestion_control/CMakeLists.txt b/quic/congestion_control/CMakeLists.txt index 57bfcc37c..af26d20c2 100644 --- a/quic/congestion_control/CMakeLists.txt +++ b/quic/congestion_control/CMakeLists.txt @@ -16,7 +16,6 @@ add_library( Copa2.cpp NewReno.cpp QuicCubic.cpp - QuicCCP.cpp ServerCongestionControllerFactory.cpp SimulatedTBF.cpp StaticCwndCongestionController.cpp @@ -50,7 +49,6 @@ target_link_libraries( mvfst_exception mvfst_state_machine mvfst_state_functions - ${LIBCCP_LIBRARY} ) file( @@ -71,4 +69,3 @@ install( ) add_subdirectory(test) -add_subdirectory(third_party/ccp) diff --git a/quic/congestion_control/CongestionControllerFactory.cpp b/quic/congestion_control/CongestionControllerFactory.cpp index b42d18d30..d02d1a919 100644 --- a/quic/congestion_control/CongestionControllerFactory.cpp +++ b/quic/congestion_control/CongestionControllerFactory.cpp @@ -33,10 +33,6 @@ DefaultCongestionControllerFactory::makeCongestionController( case CongestionControlType::NewReno: congestionController = std::make_unique(conn); break; - case CongestionControlType::CCP: - LOG(ERROR) - << "Default CC Factory cannot make CCP. Falling back to cubic."; - FOLLY_FALLTHROUGH; case CongestionControlType::Cubic: congestionController = std::make_unique(conn); break; diff --git a/quic/congestion_control/QuicCCP.cpp b/quic/congestion_control/QuicCCP.cpp deleted file mode 100644 index 5f3a6b138..000000000 --- a/quic/congestion_control/QuicCCP.cpp +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#include - -#include -#include -#include - -#ifdef CCP_ENABLED -#include -#include -#endif - -// The ccpDatapath field is only defined when ccp is enabled. -// When ccp is not enabled, we can safely set this to null, because an -// instance of this class cannot be created (will be denied by -// ServerCongestionControllerFactory). - -namespace quic { - -#ifdef CCP_ENABLED -CCP::CCP(QuicConnectionStateBase& conn) - : conn_(static_cast(conn)), - endOfRecovery_(folly::none), - inFallback_(false), - fallbackCC_(conn) { - cwndBytes_ = boundedCwnd( - conn.transportSettings.initCwndInMss * conn.udpSendPacketLen, - conn_.udpSendPacketLen, - conn_.transportSettings.maxCwndInMss, - conn_.transportSettings.minCwndInMss); - - datapath_ = conn_.ccpDatapath; - - struct ccp_datapath_info info = { - .init_cwnd = static_cast(conn.transportSettings.initCwndInMss), - .mss = static_cast(conn.udpSendPacketLen), - // the rest is not used at the moment - .src_ip = 0, // conn.peerAddress.getIPAddress().asV4().toLong() - .src_port = 0, // conn.peerAddress.getPort(), - .dst_ip = 0, // where is the destination addr? - .dst_port = 0, // where is the destination addr? - }; - - if (!datapath_) { - fallback(); - return; - } - - // Inform CCP about this new connection. The returned ccp_conn_ object - // contains per-connection state and must be passed to all other libccp - // functions regarding this connection. We pass a reference to ourself - // so that it can be pulled out later by CCPReader when it needs to look - // up the correct CCP instance for a given connection id. - ccp_conn_ = ccp_connection_start(datapath_, (void*)this, &info); - if (ccp_conn_ == nullptr) { - fallback(); - LOG(ERROR) << "libccp::ccp_connection_start failed\n"; - } -} - -CCP::~CCP() { - if (ccp_conn_) { - // Inform CCP that this connection has ended and thus it can free related - // state. - ccp_connection_free(datapath_, ccp_conn_->index); - } -} - -void CCP::onRemoveBytesFromInflight(uint64_t bytes) { - DCHECK_LE(bytes, conn_.lossState.inflightBytes); - conn_.lossState.inflightBytes -= bytes; -} - -void CCP::onPacketSent(const OutstandingPacketWrapper& packet) { - if (std::numeric_limits::max() - conn_.lossState.inflightBytes < - packet.metadata.encodedSize) { - throw QuicInternalException( - "CCP: inflightBytes overflow", LocalErrorCode::INFLIGHT_BYTES_OVERFLOW); - } - - if (inFallback_) { - fallbackCC_.onPacketSent(packet); - } else { - // Fallback algorithm takes care of bytes acked since it shares the same - // conn_ - addAndCheckOverflow( - conn_.lossState.inflightBytes, packet.metadata.encodedSize); - } - - if (conn_.qLogger) { - conn_.qLogger->addCongestionMetricUpdate( - conn_.lossState.inflightBytes, - getCongestionWindow(), - kCongestionPacketSent); - } -} - -void CCP::onAckEvent(const AckEvent& ack) { - DCHECK(ack.largestNewlyAckedPacket.has_value() && !ack.ackedPackets.empty()); - - // Fallback algorithm takes care of bytes acked since it shares the same conn_ - if (!inFallback_) { - onRemoveBytesFromInflight(ack.ackedBytes); - } - - // Add latest state to this batch of udpates. See comment regarding - // ccp_primitives struct - struct ccp_primitives* mmt = &ccp_conn_->prims; - mmt->bytes_acked += ack.ackedBytes; - mmt->packets_acked += ack.ackedPackets.size(); - for (const auto& packet : ack.ackedPackets) { - if (packet.encodedSize == 0) { - continue; - } - if (packet.lastAckedPacketInfo) { - sendRate_ = Bandwidth( - packet.totalBytesSentThen - - packet.lastAckedPacketInfo->totalBytesSent, - std::chrono::duration_cast( - packet.sentTime - packet.lastAckedPacketInfo->sentTime)); - ackRate_ = Bandwidth( - conn_.lossState.totalBytesAcked - - packet.lastAckedPacketInfo->totalBytesAcked, - std::chrono::duration_cast( - ack.ackTime - packet.lastAckedPacketInfo->ackTime)); - } else if (ack.ackTime > packet.sentTime) { - sendRate_ = Bandwidth( - packet.encodedSize, - std::chrono::duration_cast( - ack.ackTime - packet.sentTime)); - } - } -} - -void CCP::onPacketAckOrLoss( - const AckEvent* FOLLY_NULLABLE ackEvent, - const LossEvent* FOLLY_NULLABLE lossEvent) { - // If we are in fallback mode, forward the call to the fallback algorithm. - if (inFallback_) { - fallbackCC_.onPacketAckOrLoss(ackEvent, lossEvent); - } - if (!ccp_conn_) { - return; - } - - // If we never connected to ccp in the first place, nothing else to do - // regardless - if (!ccp_conn_) { - return; - } - - /** - * Even if we are in fallback, we finish the rest of this function so that we - * can keep the ccp primitives up to date in case connection with CCP is - * restored. - */ - if (lossEvent) { - onLossEvent(*lossEvent); - if (conn_.pacer) { - conn_.pacer->onPacketsLoss(); - } - } - - if (ackEvent && ackEvent->largestNewlyAckedPacket.hasValue()) { - onAckEvent(*ackEvent); - } - - /** - * The ccp_primitives struct contains the list of all statistics ccp wants to - * know about. These are kept as up to date as possible, and fed to ccp_invoke - * every time there's an ack or loss. Whenever ccp_invoke is called, libccp - * internally decides whether or not to batch and send the updates. As the - * caller, this is totally abstracted from us. - */ - struct ccp_primitives* mmt = &ccp_conn_->prims; - mmt->snd_cwnd = cwndBytes_; - mmt->rtt_sample_us = conn_.lossState.srtt.count(); - mmt->bytes_in_flight = conn_.lossState.inflightBytes; - mmt->rate_outgoing = sendRate_.normalize(); - mmt->rate_incoming = ackRate_.normalize(); - mmt->bytes_misordered = 0; // used for TCP SACK - mmt->packets_misordered = 0; // used for TCP SACK - /** - * This is the heart of ccp on the datapath side. It takes the ccp_primitives - * and runs them through the current datapath program. It handles sending - * these in an update to ccp if necessary. It also keeps track of the fallback - * timer internally. If the timer has expired, all calls to ccp_invoke will - * return LIBCCP_FALLBACK_TIMED_OUT until connection with ccp is restored. To - * get a log when this happens (once per datapath rather than per connectoion) - * enable low level logging in libccp directly. - */ - int ret = ccp_invoke(ccp_conn_); - - // If this is the first time we've received LIBCCP_FALLBACK_TIMED_OUT, it - // means we need to switch modes, otherwise we're already in fallback and can - // ignore it. - if (ret == LIBCCP_FALLBACK_TIMED_OUT && !inFallback_) { - fallback(); - } - - // If we're in fallback mode and libccp returned ok, then we can switch out of - // fallback mode. - if (ret == 0 && inFallback_) { - restoreAfterFallback(); - } - - // If libccp returned an error code other than FALLBACK_TIMED_OUT, it - // indicates an actual problem that we should raise an alert about. - if (ret && ret != LIBCCP_FALLBACK_TIMED_OUT) { - LOG(ERROR) << "libccp::ccp_invoke failed ret=" << ret; - } - - // These measurements represent a counter since the last call to ccp_invoke, - // so we need to reset them each time. The other measurements are just the - // most up-to-date view of the statistics, so they can simply be overwritten - // each time. - mmt->bytes_acked = 0; - mmt->packets_acked = 0; - mmt->lost_pkts_sample = 0; -} - -void CCP::fallback() { - inFallback_ = true; - // This just starts the fallback alg where we left off so it doesn't need to - // restart all connections at init cwnd again. - fallbackCC_.handoff(cwndBytes_); -} - -void CCP::restoreAfterFallback() { - inFallback_ = false; - // Pick up the cwnd where the fallback alg left off. - setCongestionWindow(fallbackCC_.getCongestionWindow()); -} - -void CCP::onLossEvent(const LossEvent& loss) { - DCHECK( - loss.largestLostPacketNum.hasValue() && - loss.largestLostSentTime.hasValue()); - - // Each "lost_pkt" is counted as a loss event in CCP, which will often warrant - // a different response. This logic helps distinguish between multiple lost - // packets within the same event and multiple distinct loss events. - if (!endOfRecovery_ || *endOfRecovery_ < *loss.largestLostSentTime) { - endOfRecovery_ = Clock::now(); - ccp_conn_->prims.lost_pkts_sample += loss.lostPackets; - } - - // Fallback algorithm takes care of bytes acked since it shares the same conn_ - if (!inFallback_) { - onRemoveBytesFromInflight(loss.lostBytes); - } -} - -void CCP::setPacingRate(uint64_t rate) noexcept { - pacingRate_ = rate; - if (conn_.pacer) { - conn_.pacer->setPacingRate(rate); - } else { - LOG(ERROR) << "setPacingRate called but pacer is undefined!"; - } -} - -uint64_t CCP::getWritableBytes() const noexcept { - uint64_t cwndBytes = getCongestionWindow(); - return cwndBytes > conn_.lossState.inflightBytes - ? cwndBytes - conn_.lossState.inflightBytes - : 0; -} - -uint64_t CCP::getCongestionWindow() const noexcept { - uint64_t cwnd = cwndBytes_; - if (inFallback_) { - cwnd = fallbackCC_.getCongestionWindow(); - } - return cwnd; -} - -void CCP::setCongestionWindow(uint64_t cwnd) noexcept { - cwndBytes_ = cwnd; -} - -uint64_t CCP::getBytesInFlight() const noexcept { - return conn_.lossState.inflightBytes; -} -#else - -CCP::CCP(QuicConnectionStateBase& conn) - : conn_(static_cast(conn)), - endOfRecovery_(folly::none), - inFallback_(false), - fallbackCC_(conn) {} - -void CCP::onRemoveBytesFromInflight(uint64_t) {} -void CCP::onPacketSent(const OutstandingPacketWrapper&) {} -void CCP::onPacketAckOrLoss( - const AckEvent* FOLLY_NULLABLE, - const LossEvent* FOLLY_NULLABLE) {} -uint64_t CCP::getWritableBytes() const noexcept { - return 0; -} -uint64_t CCP::getCongestionWindow() const noexcept { - return 0; -} -void CCP::setCongestionWindow(uint64_t) noexcept {} -uint64_t CCP::getBytesInFlight() const noexcept { - return 0; -} -void CCP::setPacingRate(uint64_t) noexcept {} -void CCP::onLossEvent(const LossEvent&) {} -void CCP::onAckEvent(const AckEvent&) {} -void CCP::fallback() {} -void CCP::restoreAfterFallback() {} - -CCP::~CCP() = default; - -#endif - -CongestionControlType CCP::type() const noexcept { - return CongestionControlType::CCP; -} - -void CCP::setAppIdle(bool, TimePoint) noexcept { - /* unsupported */ -} - -void CCP::setAppLimited() { - /* unsupported */ -} - -bool CCP::isAppLimited() const noexcept { - /* unsupported */ - return false; -} - -} // namespace quic diff --git a/quic/congestion_control/QuicCCP.h b/quic/congestion_control/QuicCCP.h deleted file mode 100644 index ce6a12bbd..000000000 --- a/quic/congestion_control/QuicCCP.h +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#ifdef CCP_ENABLED -#include -#endif - -#include - -namespace quic { - -/** - * The class defines CCP as a congestion control "algorithm", but it really acts - * as a proxy layer between the datapath (mvfst) and CCP. Each instance of this - * class corresponds to a separate connection that is using CCP for congestion - * control. - * - * From the perspective of this class, the behavior is very simple: - * - On each ack or loss, it updates some statistics about the connection (rtt - * etc.) - * - Once in a while, after updating it will batch the last few updates and send - * them to CCP (via IPC). - * - Asynchronously, at any time CCP may send a message (via IPC) telling this - * connection to use a new cwnd or rate. CCPReader receives these messages, - * looks up the correct instance of QuicCCP for that connection, and calls - * setCongestionWindow or setPacingRate. This class does not contain any logic - * for updating the cwnd (or rate) directly, they only change when directed by - * CCP. - * - * The low-level functionality, such as batching updates, serializing them, - * sending IPC, etc. is handled by libccp, which is a helper library provided by - * the CCP authors. - * - * If, for whatever reason, mvfst has not received any responses from CCP for a - * while, we go into "fallback" mode and use cubic instead, by simply proxying - * the acks, losses, and cwnd updates to an internal instance of QuicCubic. If - * connection is re-established with CCP, we leave fallback mode and pick up the - * cwnd where cubic left off. - * - */ -class CCP : public CongestionController { - public: - explicit CCP(QuicConnectionStateBase& conn); - ~CCP() override; - - void onRemoveBytesFromInflight(uint64_t) override; - void onPacketSent(const OutstandingPacketWrapper& packet) override; - void onPacketAckOrLoss( - const AckEvent* FOLLY_NULLABLE, - const LossEvent* FOLLY_NULLABLE) override; - void onPacketAckOrLoss( - folly::Optional ack, - folly::Optional loss) { - onPacketAckOrLoss(ack.get_pointer(), loss.get_pointer()); - } - - FOLLY_NODISCARD uint64_t getWritableBytes() const noexcept override; - FOLLY_NODISCARD uint64_t getCongestionWindow() const noexcept override; - // Called (indirectly) by CCP when it wants to update the cwnd for this - // connection. - void setCongestionWindow(uint64_t cwnd) noexcept; - void setAppIdle(bool, TimePoint) noexcept override; - void setAppLimited() override; - - FOLLY_NODISCARD CongestionControlType type() const noexcept override; - - FOLLY_NODISCARD uint64_t getBytesInFlight() const noexcept; - - // Called indirectly by CCP when it wants to update the pacing rate for this - // connection. - void setPacingRate(uint64_t rate) noexcept; - - FOLLY_NODISCARD bool isAppLimited() const noexcept override; - - void getStats(CongestionControllerStats& /*stats*/) const override {} - - private: - void onLossEvent(const LossEvent&); - void onAckEvent(const AckEvent&); - // Fallback to in-datapath congestion control impl (eg. because ccp not - // responding) - void fallback(); - // Go back to using CCP after a period of fallback - void restoreAfterFallback(); - - private: - QuicServerConnectionState& conn_; - // The current cwnd for this connection when we are not in fallback mode. - // When we are in fallback mode, we instead use - // fallbackCC_->getCongestionWindow(). - uint64_t cwndBytes_; - // The current pacing rate for this connection when we are not in fallback - // mode. When we are in fallback mode, the pacing rate is set by fallbackCC_. - uint64_t pacingRate_{0}; - // The per-connection state needed by libccp, initialized by calling - // libccp::ccp_connection_start in the constructor. - struct ccp_connection* ccp_conn_{nullptr}; - // The global (per QuicServerWorker) state needed by libccp, retrieved from - // the connection's corresponding QuicServerConnectionState. - struct ccp_datapath* datapath_{nullptr}; - // Used to help ensure we don't count the same loss event multiple times. - folly::Optional endOfRecovery_; - // Current estimate of the send and ack rate, two of the basic statistics sent - // to CCP. - Bandwidth sendRate_, ackRate_; - // Whether or not we are in fallback mode. - bool inFallback_; - // An instance of QuicCubic to use during fallback mode. - Cubic fallbackCC_; -}; -} // namespace quic diff --git a/quic/congestion_control/ServerCongestionControllerFactory.cpp b/quic/congestion_control/ServerCongestionControllerFactory.cpp index 44856c51d..c44b10567 100644 --- a/quic/congestion_control/ServerCongestionControllerFactory.cpp +++ b/quic/congestion_control/ServerCongestionControllerFactory.cpp @@ -14,7 +14,6 @@ #include #include #include -#include #include #include @@ -35,15 +34,6 @@ ServerCongestionControllerFactory::makeCongestionController( case CongestionControlType::NewReno: congestionController = std::make_unique(conn); break; - case CongestionControlType::CCP: -#ifdef CCP_ENABLED - congestionController = std::make_unique(conn); - break; -#else - LOG(ERROR) - << "Server CC Factory cannot make CCP (unless recompiled with -DCCP_ENABLED). Falling back to cubic."; - FOLLY_FALLTHROUGH; -#endif case CongestionControlType::Cubic: congestionController = std::make_unique(conn); break; diff --git a/quic/congestion_control/test/CCPTest.cpp b/quic/congestion_control/test/CCPTest.cpp deleted file mode 100644 index 6814f2074..000000000 --- a/quic/congestion_control/test/CCPTest.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#include - -#include -#include - -#define MSS 1232 - -namespace quic::test { - -/* -class QuicCCPTest : public Test { -}; -*/ - -#ifdef CCP_ENABLED -TEST(CCPTest, TestFallback) { - folly::ScopedEventBaseThread evbThread_; - QuicCcpThreadLauncher launcher; - - EXPECT_FALSE(launcher.hasLaunched()); - - evbThread_.getEventBase()->waitUntilRunning(); - - std::unique_ptr reader = std::make_unique(); - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - reader->try_initialize(evbThread_.getEventBase(), 0, 0, 0); - auto ret = reader->connect(); - EXPECT_LT(ret, 0); - reader->start(); - }); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - std::unique_ptr ccp; - QuicServerConnectionState conn( - FizzServerQuicHandshakeContext::Builder().build()); - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - conn.ccpDatapath = reader->getDatapath(); - conn.lossState.srtt = 50us; - ccp = std::make_unique(conn); - EXPECT_EQ( - ccp->getCongestionWindow(), - conn.transportSettings.initCwndInMss * conn.udpSendPacketLen); - EXPECT_EQ(conn.lossState.inflightBytes, 0); - }); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - auto packet1 = makeTestingWritePacket(10, MSS, MSS); - ccp->onPacketSent(packet1); - EXPECT_EQ(conn.lossState.inflightBytes, MSS); - auto packet2 = makeTestingWritePacket(20, MSS, MSS); - ccp->onPacketSent(packet2); - EXPECT_EQ(conn.lossState.inflightBytes, MSS * 2); - auto packet3 = makeTestingWritePacket(30, MSS, MSS); - ccp->onPacketSent(packet3); - EXPECT_EQ(conn.lossState.inflightBytes, MSS * 3); - ccp->onPacketAckOrLoss( - makeAck(10, MSS, Clock::now(), packet1.metadata.time), folly::none); - EXPECT_EQ(conn.lossState.inflightBytes, MSS * 2); - ccp->onPacketAckOrLoss( - makeAck(20, MSS, Clock::now(), packet2.metadata.time), folly::none); - EXPECT_EQ(conn.lossState.inflightBytes, MSS * 1); - ccp->onPacketAckOrLoss( - makeAck(30, MSS, Clock::now(), packet3.metadata.time), folly::none); - EXPECT_EQ(conn.lossState.inflightBytes, 0); - }); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - ccp.reset(); - auto manual = reader.release(); - delete manual; - }); -} - -#define xstr(s) str(s) -#define str(s) #s -#define FIXED_CWND_TEST 100 -#define INIT_CWND 10 - -TEST(CCPTest, TestSimple) { - folly::ScopedEventBaseThread evbThread_; - QuicCcpThreadLauncher launcher; - - evbThread_.getEventBase()->waitUntilRunning(); - - std::string ccpConfig = - std::string("constant --cwnd=") + std::string(xstr(FIXED_CWND_TEST)); - launcher.start(ccpConfig); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - EXPECT_TRUE(launcher.hasLaunched()); - - std::unique_ptr reader = std::make_unique(); - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - reader->try_initialize( - evbThread_.getEventBase(), launcher.getCcpId(), 0, 0); - auto ret = reader->connect(); - EXPECT_GE(ret, 0); - reader->start(); - }); - - std::unique_ptr ccp; - QuicServerConnectionState conn( - FizzServerQuicHandshakeContext::Builder().build()); - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - conn.ccpDatapath = reader->getDatapath(); - conn.lossState.srtt = 50us; - ccp = std::make_unique(conn); - EXPECT_EQ(ccp->getCongestionWindow(), INIT_CWND * conn.udpSendPacketLen); - }); - - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - auto packet1 = makeTestingWritePacket(10, MSS, MSS); - ccp->onPacketSent(packet1); - auto packet2 = makeTestingWritePacket(20, MSS, MSS); - ccp->onPacketSent(packet2); - auto packet3 = makeTestingWritePacket(30, MSS, MSS); - ccp->onPacketSent(packet3); - EXPECT_EQ(ccp->getCongestionWindow(), INIT_CWND * conn.udpSendPacketLen); - EXPECT_EQ(conn.lossState.inflightBytes, MSS * 3); - ccp->onPacketAckOrLoss( - makeAck(10, MSS, Clock::now(), packet1.metadata.time), folly::none); - EXPECT_EQ( - ccp->getCongestionWindow(), FIXED_CWND_TEST * conn.udpSendPacketLen); - ccp->onPacketAckOrLoss( - makeAck(20, MSS, Clock::now(), packet2.metadata.time), folly::none); - ccp->onPacketAckOrLoss( - makeAck(30, MSS, Clock::now(), packet3.metadata.time), folly::none); - EXPECT_EQ(conn.lossState.inflightBytes, 0); - EXPECT_EQ( - ccp->getCongestionWindow(), FIXED_CWND_TEST * conn.udpSendPacketLen); - }); - - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - - evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] { - ccp.reset(); - auto manual = reader.release(); - delete manual; - }); - - launcher.stop(); -} -#endif - -} // namespace quic::test diff --git a/quic/congestion_control/third_party/ccp/.gitignore b/quic/congestion_control/third_party/ccp/.gitignore deleted file mode 100644 index 03314f77b..000000000 --- a/quic/congestion_control/third_party/ccp/.gitignore +++ /dev/null @@ -1 +0,0 @@ -Cargo.lock diff --git a/quic/congestion_control/third_party/ccp/CMakeLists.txt b/quic/congestion_control/third_party/ccp/CMakeLists.txt deleted file mode 100644 index 91e4adfb1..000000000 --- a/quic/congestion_control/third_party/ccp/CMakeLists.txt +++ /dev/null @@ -1,47 +0,0 @@ -# The core CCP library is written in rust, thus we need to use cargo to build it. -# This file externally calls out to cargo and then sets some variables to tell cmake where to find the generated library. - -if(DEFINED CCP_ENABLED) - - # choose cargo build mode based on cmake build mode - if (CMAKE_BUILD_TYPE STREQUAL "Debug") - # always need to build with nightly because portus (ccp rust library) requires it (at the moment) - set(CARGO_CMD cargo +nightly build --lib) - set(TARGET_DIR "debug") - else () - # always need to build with nightly because portus (ccp rust library) requires it (at the moment) - set(CARGO_CMD cargo +nightly build --lib --release) - set(TARGET_DIR "release") - endif () - - # cargo will place the library here - set(CARGO_TARGET_LOC "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_DIR}/libstartccp.a") - # but cmake likes the library to be here (i.e. no ${TARGET_DIR}) - set(CMAKE_TARGET_LOC "${CMAKE_CURRENT_BINARY_DIR}/libstartccp.a") - - # since cmake isn't building ccp directly (cargo is), it doesn't know which source files it depends on - # (and thus when to rebuild it, so we need to specify it manually below with "DEPENDS"...) - # the set of source files is any rust or Cargo.toml file in quic/congestion_control/third_party - file(GLOB_RECURSE ALG_SRC ${CMAKE_CURRENT_SOURCE_DIR}/../ *.rs Cargo.toml) - message(STATUS "found alg files: ${ALG_SRC}") - - # run cargo to actually build the library, - # then copy the library to where cmake wants it to be - add_custom_command( - OUTPUT ${CMAKE_TARGET_LOC} - COMMAND CARGO_TARGET_DIR=${CMAKE_CURRENT_BINARY_DIR} ${CARGO_CMD} - COMMAND cp ${CARGO_TARGET_LOC} ${CMAKE_TARGET_LOC} - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} - DEPENDS ${ALG_SRC} - COMMENT "Building CCP algorithms..." - VERBATIM - ) - # we make a target that depends on the output of cargo (as opposed to generating it here directly) so that cmake - # doesn't re-run cargo every time - add_custom_target(libstartccp DEPENDS ${CMAKE_TARGET_LOC}) - - # each cmakelists.txt has its own scope, so we can't access CMAKE_TARGET_LOC in the scope where we need to use it as a dependency - # so instead, we set it as a property of this target, which we can lookup later using get_target_properties() - set_target_properties(libstartccp PROPERTIES LOCATION ${CMAKE_TARGET_LOC}) - -endif() diff --git a/quic/congestion_control/third_party/ccp/Cargo.toml b/quic/congestion_control/third_party/ccp/Cargo.toml deleted file mode 100644 index 06ec7a1b7..000000000 --- a/quic/congestion_control/third_party/ccp/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "startccp" -version = "0.1.0" -edition = "2021" - -[lib] -name = "startccp" -crate-type = ["staticlib", "cdylib"] - -[dependencies] -libc = "0.2" -clap = "2.32" -portus = { git = "https://github.com/ccp-project/portus", rev = "ef2ddb674785e1f0c0869fe648896ac866659ab3" } -portus_export = { git = "https://github.com/ccp-project/portus", rev = "ef2ddb674785e1f0c0869fe648896ac866659ab3" } - -[profile.release] -opt-level = 3 -lto = true diff --git a/quic/congestion_control/third_party/ccp/ccp_const/src/bin/ccp_const.rs b/quic/congestion_control/third_party/ccp/ccp_const/src/bin/ccp_const.rs deleted file mode 100644 index 9b3381867..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_const/src/bin/ccp_const.rs +++ /dev/null @@ -1,91 +0,0 @@ -extern crate ccp_const; -extern crate clap; -extern crate portus; -extern crate slog; - -use ccp_const::CcpConstAlg; -use clap::Arg; -use slog::warn; - -const MBPS_TO_BPS: u32 = 1_000_000; -const BITS_TO_BYTES: u32 = 8; -const PKTS_TO_BYTES: u32 = 1500; -fn main() { - let log = portus::algs::make_logger(); - - let (cfg, ipc) = || -> Result<(CcpConstAlg, String), String> { - let matches = clap::App::new("CCP Constant Cwnd/Rate") - .version("0.1.0") - .author("") - .about("Congestion control algorithm which sets a constant rate or cwnd") - .arg( - Arg::with_name("ipc") - .long("ipc") - .help("Sets the type of ipc to use: (netlink|unix)") - .takes_value(true) - .required(true) - .validator(portus::algs::ipc_valid), - ) - .arg( - Arg::with_name("cwnd") - .long("cwnd") - .takes_value(true) - .help("Sets the congestion window, in packets."), - ) - .arg( - Arg::with_name("rate") - .long("rate") - .takes_value(true) - .help("Sets the rate to use, in Mbps, must also specify cwnd_cap"), - ) - .arg( - Arg::with_name("cwnd_cap") - .long("cwnd_cap") - .takes_value(true) - .help("The max cwnd, in packets, *only* when setting a rate"), - ) - .group( - clap::ArgGroup::with_name("to_set") - .args(&["cwnd", "rate"]) - .required(true), - ) - .get_matches(); - - if !matches.is_present("to_set") { - return Err(String::from("must supply either cwnd or rate")); - } - - let const_param = if matches.is_present("rate") { - let rate = u32::from_str_radix(matches.value_of("rate").unwrap(), 10) - .map_err(|e| format!("{:?}", e))?; - if !matches.is_present("cwnd_cap") { - return Err(String::from("when using rate, must also specify cwnd_cap")); - } - let cwnd_cap = u32::from_str_radix(matches.value_of("cwnd_cap").unwrap(), 10) - .map_err(|e| format!("{:?}", e))?; - - let rate = rate * MBPS_TO_BPS / BITS_TO_BYTES; - let cwnd_cap = cwnd_cap * PKTS_TO_BYTES; - Ok(ccp_const::Constant::Rate { rate, cwnd_cap }) - } else if matches.is_present("cwnd") { - let cwnd = u32::from_str_radix(matches.value_of("cwnd").unwrap(), 10) - .map_err(|e| format!("{:?}", e))?; - let cwnd = cwnd * PKTS_TO_BYTES; - Ok(ccp_const::Constant::Cwnd(cwnd)) - } else { - Err(String::from("must supply either cwnd or rate")) - }?; - - Ok(( - CcpConstAlg { - logger: Some(log.clone()), - const_param, - }, - String::from(matches.value_of("ipc").unwrap()), - )) - }() - .map_err(|e| warn!(log, "bad argument"; "err" => ?e)) - .unwrap(); - - portus::start!(ipc.as_str(), Some(log), cfg).unwrap() -} diff --git a/quic/congestion_control/third_party/ccp/ccp_const/src/lib.rs b/quic/congestion_control/third_party/ccp/ccp_const/src/lib.rs deleted file mode 100644 index edf9c0388..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_const/src/lib.rs +++ /dev/null @@ -1,201 +0,0 @@ -extern crate clap; -extern crate portus; -extern crate portus_export; -extern crate slog; - -use portus::ipc::Ipc; -use portus::lang::Scope; -use portus::{CongAlg, CongAlgBuilder, Datapath, DatapathInfo, DatapathTrait, Report}; -use slog::{debug, warn}; -use std::collections::HashMap; - -#[derive(Clone, Copy)] -pub enum Constant { - Cwnd(u32), - Rate { rate: u32, cwnd_cap: u32 }, -} - -#[portus_export::register_ccp_alg] -pub struct CcpConstAlg { - pub logger: Option, - pub const_param: Constant, -} - -pub struct CcpConstFlow { - logger: Option, - sc: Scope, - control_channel: Datapath, - const_param: Constant, - mss: u32, -} - -impl CongAlg for CcpConstAlg { - type Flow = CcpConstFlow; - - fn name() -> &'static str { - "constant" - } - - fn datapath_programs(&self) -> HashMap<&'static str, String> { - let mut h = HashMap::default(); - h.insert( - "constant", - " - (def (Report - (volatile rtt 0) - (volatile rin 0) - (volatile rout 0) - (volatile loss 0) - )) - (when true - (:= Report.rtt Flow.rtt_sample_us) - (:= Report.rin Flow.rate_outgoing) - (:= Report.rout Flow.rate_incoming) - (:= Report.loss (+ Report.loss Ack.lost_pkts_sample)) - (fallthrough) - ) - (when (> Micros 1000000) - (report) - (:= Micros 0) - )" - .to_owned(), - ); - - h - } - - fn new_flow(&self, mut control: Datapath, info: DatapathInfo) -> Self::Flow { - let params = match self.const_param { - Constant::Cwnd(c) => vec![("Cwnd", c * info.mss)], - Constant::Rate { - rate: r, - cwnd_cap: c, - } => vec![("Cwnd", c * info.mss), ("Rate", r)], - }; - let sc = control.set_program("constant", Some(¶ms)).unwrap(); - CcpConstFlow { - logger: self.logger.clone(), - sc, - control_channel: control, - const_param: self.const_param, - mss: info.mss, - } - } -} - -use clap::Arg; -const MBPS_TO_BPS: u32 = 1_000_000; -const BITS_TO_BYTES: u32 = 8; -//const PKTS_TO_BYTES: u32 = 1500; -impl<'a, 'b> CongAlgBuilder<'a, 'b> for CcpConstAlg { - fn args() -> clap::App<'a, 'b> { - clap::App::new("CCP Constant Cwnd/Rate") - .version("0.2.0") - .author("") - .about("Congestion control algorithm which sets a constant rate or cwnd") - .arg( - Arg::with_name("cwnd") - .long("cwnd") - .takes_value(true) - .help("Sets the congestion window, in packets."), - ) - .arg( - Arg::with_name("rate") - .long("rate") - .takes_value(true) - .help("Sets the rate to use, in Mbps, must also specify cwnd_cap"), - ) - .arg( - Arg::with_name("cwnd_cap") - .long("cwnd_cap") - .takes_value(true) - .help("The max cwnd, in MSS packets, *only* when setting a rate"), - ) - .group( - clap::ArgGroup::with_name("to_set") - .args(&["cwnd", "rate"]) - .required(true), - ) - } - - fn with_arg_matches( - args: &clap::ArgMatches, - logger: Option, - ) -> Result { - if !args.is_present("to_set") { - return Err(portus::Error(String::from( - "must supply either cwnd or rate", - ))); - } - - let const_param = if args.is_present("rate") { - let rate = u32::from_str_radix(args.value_of("rate").unwrap(), 10) - .map_err(|e| portus::Error(e.to_string()))?; - if !args.is_present("cwnd_cap") { - return Err(portus::Error(String::from( - "when using rate, must also specify cwnd_cap", - ))); - } - let cwnd_cap = u32::from_str_radix(args.value_of("cwnd_cap").unwrap(), 10) - .map_err(|e| portus::Error(e.to_string()))?; - - let rate = rate * MBPS_TO_BPS / BITS_TO_BYTES; - let cwnd_cap = cwnd_cap; - Ok(Constant::Rate { rate, cwnd_cap }) - } else if args.is_present("cwnd") { - let cwnd = u32::from_str_radix(args.value_of("cwnd").unwrap(), 10) - .map_err(|e| portus::Error(e.to_string()))?; - let cwnd = cwnd; - Ok(Constant::Cwnd(cwnd)) - } else { - Err(portus::Error(String::from( - "must supply either cwnd or rate", - ))) - }?; - - Ok(Self { - logger, - const_param, - }) - } -} - -impl portus::Flow for CcpConstFlow { - fn on_report(&mut self, sock_id: u32, m: Report) { - let rtt = m - .get_field("Report.rtt", &self.sc) - .expect("expected rtt in report") as u32; - let rin = m - .get_field("Report.rin", &self.sc) - .expect("expected rin in report") as u32; - let rout = m - .get_field("Report.rout", &self.sc) - .expect("expected rout in report") as u32; - let loss = m - .get_field("Report.loss", &self.sc) - .expect("expected loss in report") as u32; - - self.logger.as_ref().map(|log| { - debug!(log, "report"; - "sid" => sock_id, - "rtt(us)" => rtt, - "rin(Bps)" => rin, - "rout(Bps)" => rout, - "loss(pkts)" => loss, - ); - }); - - let update = match self.const_param { - Constant::Cwnd(c) => vec![("Cwnd", c * self.mss)], - Constant::Rate { - rate: r, - cwnd_cap: c, - } => vec![("Cwnd", c * self.mss), ("Rate", r)], - }; - if let Err(e) = self.control_channel.update_field(&self.sc, &update) { - self.logger.as_ref().map(|log| { - warn!(log, "rate update error"; "err" => ?e,); - }); - } - } -} diff --git a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin/cubic.rs b/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin/cubic.rs deleted file mode 100644 index ffa0227f5..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin/cubic.rs +++ /dev/null @@ -1,24 +0,0 @@ -extern crate clap; -extern crate time; - -#[macro_use] -extern crate slog; - -extern crate generic_cong_avoid; -extern crate portus; - -use generic_cong_avoid::cubic::Cubic; - -fn main() { - let log = portus::algs::make_logger(); - let (alg, ipc) = generic_cong_avoid::make_args("CCP Cubic", log.clone()) - .map_err(|e| warn!(log, "bad argument"; "err" => ?e)) - .unwrap(); - - info!(log, "initializing"; - "reports" => ?alg.report_option, - "slow_start_mode" => ?alg.ss, - ); - - generic_cong_avoid::start::(ipc.as_str(), log, alg); -} diff --git a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin/reno.rs b/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin/reno.rs deleted file mode 100644 index 0da2931dc..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin/reno.rs +++ /dev/null @@ -1,24 +0,0 @@ -extern crate clap; -extern crate time; - -#[macro_use] -extern crate slog; - -extern crate generic_cong_avoid; -extern crate portus; - -use generic_cong_avoid::reno::Reno; - -fn main() { - let log = portus::algs::make_logger(); - let (alg, ipc) = generic_cong_avoid::make_args("CCP Reno", log.clone()) - .map_err(|e| warn!(log, "bad argument"; "err" => ?e)) - .unwrap(); - - info!(log, "initializing"; - "reports" => ?alg.report_option, - "slow_start_mode" => ?alg.ss, - ); - - generic_cong_avoid::start::(ipc.as_str(), log, alg); -} diff --git a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin_helper.rs b/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin_helper.rs deleted file mode 100644 index 8c12e4b89..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/bin_helper.rs +++ /dev/null @@ -1,91 +0,0 @@ -use clap::Arg; -use { - Alg, GenericCongAvoidAlg, GenericCongAvoidConfigReport, GenericCongAvoidConfigSS, - DEFAULT_SS_THRESH, -}; - -pub fn make_args( - name: &str, - logger: impl Into>, -) -> Result<(Alg, String), std::num::ParseIntError> { - let ss_thresh_default = format!("{}", DEFAULT_SS_THRESH); - let matches = clap::App::new(name) - .version("0.2.0") - .author("Akshay Narayan ") - .about("CCP implementation of a congestion avoidance algorithm") - .arg(Arg::with_name("ipc") - .long("ipc") - .help("Sets the type of ipc to use: (netlink|unix)") - .default_value("unix") - .validator(portus::algs::ipc_valid)) - .arg(Arg::with_name("init_cwnd") - .long("init_cwnd") - .help("Sets the initial congestion window, in bytes. Setting 0 will use datapath default.") - .default_value("0")) - .arg(Arg::with_name("ss_thresh") - .long("ss_thresh") - .help("Sets the slow start threshold, in bytes") - .default_value(&ss_thresh_default)) - .arg(Arg::with_name("ss_in_fold") - .long("ss_in_fold") - .help("Implement slow start in the datapath")) - .arg(Arg::with_name("report_per_ack") - .long("per_ack") - .help("Specifies that the datapath should send a measurement upon every ACK")) - .arg(Arg::with_name("report_per_interval") - .long("report_interval_ms") - .short("i") - .takes_value(true)) - .group(clap::ArgGroup::with_name("interval") - .args(&["report_per_ack", "report_per_interval"]) - .required(false)) - .arg(Arg::with_name("compensate_update") - .long("compensate_update") - .help("Scale the congestion window update during slow start to compensate for reporting delay")) - .arg(Arg::with_name("deficit_timeout") - .long("deficit_timeout") - .default_value("0") - .help("Number of RTTs to wait after a loss event to allow further CWND reductions. \ - Default 0 means CWND deficit counting is enforced strictly with no timeout.")) - .args(&A::args()) - .get_matches(); - - let ipc = String::from(matches.value_of("ipc").unwrap()); - - Ok(( - Alg { - ss_thresh: u32::from_str_radix(matches.value_of("ss_thresh").unwrap(), 10)?, - init_cwnd: u32::from_str_radix(matches.value_of("init_cwnd").unwrap(), 10)?, - report_option: if matches.is_present("report_per_ack") { - GenericCongAvoidConfigReport::Ack - } else if matches.is_present("report_per_interval") { - GenericCongAvoidConfigReport::Interval(time::Duration::milliseconds( - matches - .value_of("report_per_interval") - .unwrap() - .parse() - .unwrap(), - )) - } else { - GenericCongAvoidConfigReport::Rtt - }, - ss: if matches.is_present("ss_in_fold") { - GenericCongAvoidConfigSS::Datapath - } else { - GenericCongAvoidConfigSS::Ccp - }, - use_compensation: matches.is_present("compensate_update"), - deficit_timeout: u32::from_str_radix(matches.value_of("deficit_timeout").unwrap(), 10)?, - logger: logger.into(), - alg: A::with_args(&matches), - }, - ipc, - )) -} - -pub fn start(ipc: &str, log: slog::Logger, alg: Alg) -where - A: 'static, -{ - portus::start!(ipc, Some(log), alg).unwrap() -} diff --git a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/cubic.rs b/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/cubic.rs deleted file mode 100644 index 99b937260..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/cubic.rs +++ /dev/null @@ -1,167 +0,0 @@ -extern crate portus_export; -extern crate slog; -extern crate time; - -pub use crate::GenericCongAvoidAlg; -pub use crate::GenericCongAvoidFlow; -pub use crate::GenericCongAvoidMeasurements; - -#[derive(Default)] -pub struct Cubic { - pkt_size: u32, - init_cwnd: u32, - - cwnd: f64, - cwnd_cnt: f64, - tcp_friendliness: bool, - beta: f64, - fast_convergence: bool, - c: f64, - wlast_max: f64, - epoch_start: f64, - origin_point: f64, - d_min: f64, - wtcp: f64, - k: f64, - ack_cnt: f64, - cnt: f64, - cubic_rtt: f64, -} - -impl Cubic { - fn cubic_update(&mut self) { - self.ack_cnt += 1.0; - if self.epoch_start <= 0.0 { - self.epoch_start = - (time::get_time().sec as f64) + f64::from(time::get_time().nsec) / 1_000_000_000.0; - if self.cwnd < self.wlast_max { - let temp = (self.wlast_max - self.cwnd) / self.c; - self.k = (temp.max(0.0)).powf(1.0 / 3.0); - self.origin_point = self.wlast_max; - } else { - self.k = 0.0; - self.origin_point = self.cwnd; - } - - self.ack_cnt = 1.0; - self.wtcp = self.cwnd - } - - let t = (time::get_time().sec as f64) - + f64::from(time::get_time().nsec) / 1_000_000_000.0 - + self.d_min - - self.epoch_start; - let target = self.origin_point + self.c * ((t - self.k) * (t - self.k) * (t - self.k)); - if target > self.cwnd { - self.cnt = self.cwnd / (target - self.cwnd); - } else { - self.cnt = 100.0 * self.cwnd; - } - - if self.tcp_friendliness { - self.cubic_tcp_friendliness(); - } - } - - fn cubic_tcp_friendliness(&mut self) { - self.wtcp += ((3.0 * self.beta) / (2.0 - self.beta)) * (self.ack_cnt / self.cwnd); - self.ack_cnt = 0.0; - if self.wtcp > self.cwnd { - let max_cnt = self.cwnd / (self.wtcp - self.cwnd); - if self.cnt > max_cnt { - self.cnt = max_cnt; - } - } - } - - fn cubic_reset(&mut self) { - self.wlast_max = 0.0; - self.epoch_start = -0.1; - self.origin_point = 0.0; - self.d_min = -0.1; - self.wtcp = 0.0; - self.k = 0.0; - self.ack_cnt = 0.0; - } -} - -impl GenericCongAvoidAlg for Cubic { - type Flow = Self; - - fn name() -> &'static str { - "cubic" - } - - fn with_args(_: &clap::ArgMatches) -> Self { - Default::default() - } - - fn new_flow(&self, _logger: Option, init_cwnd: u32, mss: u32) -> Self::Flow { - Cubic { - pkt_size: mss, - init_cwnd: init_cwnd / mss, - cwnd: f64::from(init_cwnd / mss), - cwnd_cnt: 0.0f64, - tcp_friendliness: true, - beta: 0.3f64, - fast_convergence: true, - c: 0.4f64, - wlast_max: 0.0f64, - epoch_start: -0.1f64, - origin_point: 0.0f64, - d_min: -0.1f64, - wtcp: 0.0f64, - k: 0.0f64, - ack_cnt: 0.0f64, - cnt: 0.0f64, - cubic_rtt: 0.1f64, - } - } -} - -impl GenericCongAvoidFlow for Cubic { - fn curr_cwnd(&self) -> u32 { - (self.cwnd * f64::from(self.pkt_size)) as u32 - } - - fn set_cwnd(&mut self, cwnd: u32) { - self.cwnd = f64::from(cwnd) / f64::from(self.pkt_size); - } - - fn increase(&mut self, m: &GenericCongAvoidMeasurements) { - self.cubic_rtt = (f64::from(m.rtt)) * 0.000_001; - let f_rtt = (f64::from(m.rtt)) * 0.000_001; - let no_of_acks = ((f64::from(m.acked)) / (f64::from(self.pkt_size))) as u32; - for _i in 0..no_of_acks { - if self.d_min <= 0.0 || f_rtt < self.d_min { - self.d_min = f_rtt; - } - - self.cubic_update(); - if self.cwnd_cnt > self.cnt { - self.cwnd += 1.0; - self.cwnd_cnt = 0.0; - } else { - self.cwnd_cnt += 1.0; - } - } - } - - fn reduction(&mut self, _m: &GenericCongAvoidMeasurements) { - self.epoch_start = -0.1; - if self.cwnd < self.wlast_max && self.fast_convergence { - self.wlast_max = self.cwnd * ((2.0 - self.beta) / 2.0); - } else { - self.wlast_max = self.cwnd; - } - - self.cwnd *= 1.0 - self.beta; - if self.cwnd as u32 <= self.init_cwnd { - self.cwnd = f64::from(self.init_cwnd); - } - } - - fn reset(&mut self) { - self.cubic_reset(); - } -} diff --git a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/lib.rs b/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/lib.rs deleted file mode 100644 index 26950a92e..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/lib.rs +++ /dev/null @@ -1,601 +0,0 @@ -extern crate time; -#[macro_use] -extern crate slog; -extern crate clap; -extern crate portus; - -use portus::ipc::Ipc; -use portus::lang::Scope; -use portus::{CongAlg, CongAlgBuilder, Datapath, DatapathInfo, DatapathTrait, Report}; -use std::collections::HashMap; - -pub mod cubic; -pub mod reno; - -//mod bin_helper; -//pub use bin_helper::{make_args, start}; - -pub const DEFAULT_SS_THRESH: u32 = 0x7fff_ffff; -pub const DEFAULT_SS_THRESH_STR: &'static str = "2147483647"; - -pub struct GenericCongAvoidMeasurements { - pub acked: u32, - pub was_timeout: bool, - pub sacked: u32, - pub loss: u32, - pub rtt: u32, - pub inflight: u32, -} - -#[derive(Debug, Clone, Copy)] -pub enum GenericCongAvoidConfigReport { - Ack, - Rtt, - Interval(time::Duration), -} - -#[derive(Debug, Clone, Copy)] -pub enum GenericCongAvoidConfigSS { - Datapath, - Ccp, -} - -pub trait GenericCongAvoidFlow { - fn curr_cwnd(&self) -> u32; - fn set_cwnd(&mut self, cwnd: u32); - fn increase(&mut self, m: &GenericCongAvoidMeasurements); - fn reduction(&mut self, m: &GenericCongAvoidMeasurements); - fn reset(&mut self) {} -} - -pub trait GenericCongAvoidAlg { - type Flow: GenericCongAvoidFlow; - - fn name() -> &'static str; - fn args<'a, 'b>() -> Vec> { - vec![] - } - fn with_args(matches: &clap::ArgMatches) -> Self; - fn new_flow(&self, logger: Option, init_cwnd: u32, mss: u32) -> Self::Flow; -} - -pub struct Alg { - pub deficit_timeout: u32, - pub init_cwnd: u32, - pub report_option: GenericCongAvoidConfigReport, - pub ss: GenericCongAvoidConfigSS, - pub ss_thresh: u32, - pub use_compensation: bool, - pub logger: Option, - pub alg: A, -} - -impl CongAlg for Alg { - type Flow = Flow; - - fn name() -> &'static str { - A::name() - } - - fn datapath_programs(&self) -> HashMap<&'static str, String> { - let mut h = HashMap::default(); - h.insert( - "DatapathIntervalProg", - " - (def - (Report - (volatile acked 0) - (volatile sacked 0) - (volatile loss 0) - (volatile timeout false) - (volatile rtt 0) - (volatile inflight 0) - ) - (reportTime 0) - ) - (when true - (:= Report.inflight Flow.bytes_in_flight) - (:= Report.rtt Flow.rtt_sample_us) - (:= Report.acked (+ Report.acked Ack.bytes_acked)) - (:= Report.sacked (+ Report.sacked Ack.packets_misordered)) - (:= Report.loss Ack.lost_pkts_sample) - (:= Report.timeout Flow.was_timeout) - (fallthrough) - ) - (when (|| Report.timeout (> Report.loss 0)) - (report) - (:= Micros 0) - ) - (when (> Micros reportTime) - (report) - (:= Micros 0) - ) - " - .to_string(), - ); - - h.insert( - "DatapathIntervalRTTProg", - " - (def (Report - (volatile acked 0) - (volatile sacked 0) - (volatile loss 0) - (volatile timeout false) - (volatile rtt 0) - (volatile inflight 0) - )) - (when true - (:= Report.inflight Flow.bytes_in_flight) - (:= Report.rtt Flow.rtt_sample_us) - (:= Report.acked (+ Report.acked Ack.bytes_acked)) - (:= Report.sacked (+ Report.sacked Ack.packets_misordered)) - (:= Report.loss Ack.lost_pkts_sample) - (:= Report.timeout Flow.was_timeout) - (fallthrough) - ) - (when (|| Report.timeout (> Report.loss 0)) - (report) - (:= Micros 0) - ) - (when (> Micros Flow.rtt_sample_us) - (report) - (:= Micros 0) - ) - " - .to_string(), - ); - - h.insert( - "AckUpdateProg", - " - (def (Report - (volatile acked 0) - (volatile sacked 0) - (volatile loss 0) - (volatile timeout false) - (volatile rtt 0) - (volatile inflight 0) - )) - (when true - (:= Report.acked (+ Report.acked Ack.bytes_acked)) - (:= Report.sacked (+ Report.sacked Ack.packets_misordered)) - (:= Report.loss Ack.lost_pkts_sample) - (:= Report.timeout Flow.was_timeout) - (:= Report.rtt Flow.rtt_sample_us) - (:= Report.inflight Flow.bytes_in_flight) - (report) - ) - " - .to_string(), - ); - - h.insert( - "SSUpdateProg", - " - (def (Report - (volatile acked 0) - (volatile sacked 0) - (volatile loss 0) - (volatile timeout false) - (volatile rtt 0) - (volatile inflight 0) - )) - (when true - (:= Report.acked (+ Report.acked Ack.bytes_acked)) - (:= Report.sacked (+ Report.sacked Ack.packets_misordered)) - (:= Report.loss Ack.lost_pkts_sample) - (:= Report.timeout Flow.was_timeout) - (:= Report.rtt Flow.rtt_sample_us) - (:= Report.inflight Flow.bytes_in_flight) - (:= Cwnd (+ Cwnd Ack.bytes_acked)) - (fallthrough) - ) - (when (|| Report.timeout (> Report.loss 0)) - (report) - ) - - " - .to_string(), - ); - - h - } - - fn new_flow(&self, control: Datapath, info: DatapathInfo) -> Self::Flow { - let init_cwnd = if self.init_cwnd != 0 { - self.init_cwnd - } else { - info.init_cwnd - }; - - // hack for now to deal with inconsistent units (init_cwnd should be in bytes, but is in pkts) - // if init_cwnd < info.mss { - // init_cwnd = init_cwnd * info.mss; - // } - - let mut s = Flow { - control_channel: control, - logger: self.logger.clone(), - report_option: self.report_option, - sc: Default::default(), - ss_thresh: self.ss_thresh, - rtt: 0, - in_startup: false, - mss: info.mss, - use_compensation: self.use_compensation, - deficit_timeout: self.deficit_timeout, - init_cwnd, - curr_cwnd_reduction: 0, - last_cwnd_reduction: time::now().to_timespec() - time::Duration::milliseconds(500), - alg: self.alg.new_flow(self.logger.clone(), init_cwnd, info.mss), - }; - - match (self.ss, self.report_option) { - (GenericCongAvoidConfigSS::Datapath, _) => { - s.sc = s.install_ss_update(); - s.in_startup = true; - } - (GenericCongAvoidConfigSS::Ccp, GenericCongAvoidConfigReport::Ack) => { - s.sc = s.install_ack_update(); - } - (GenericCongAvoidConfigSS::Ccp, GenericCongAvoidConfigReport::Rtt) => { - s.sc = s.install_datapath_interval_rtt(); - } - (GenericCongAvoidConfigSS::Ccp, GenericCongAvoidConfigReport::Interval(i)) => { - s.sc = s.install_datapath_interval(i); - } - } - - //self.logger.as_ref().map(|log| { - // debug!(log, "setting initial cwnd"; - // "curr_cwnd (bytes)" => s.alg.curr_cwnd(), - // "mss" => s.mss, - // ); - //}); - s.update_cwnd(); - - // eprintln!("sleeping..."); - // std::thread::sleep(std::time::Duration::from_millis(1500)); - // eprintln!("awake!"); - - s - } -} - -use clap::Arg; -impl<'a, 'b, A: GenericCongAvoidAlg> CongAlgBuilder<'a, 'b> for Alg { - fn args() -> clap::App<'a, 'b> { - clap::App::new("CCP generic congestion avoidance") - .version("0.4.0") - .author("CCP Project ") - .about("CCP implementation of a congestion avoidance algorithm") - .arg(Arg::with_name("ipc") - .long("ipc") - .help("Sets the type of ipc to use: (netlink|unix)") - .default_value("unix") - .validator(portus::algs::ipc_valid)) - .arg(Arg::with_name("init_cwnd") - .long("init_cwnd") - .help("Sets the initial congestion window, in bytes. Setting 0 will use datapath default.") - .default_value("0")) - .arg(Arg::with_name("ss_thresh") - .long("ss_thresh") - .help("Sets the slow start threshold, in bytes") - .default_value(&DEFAULT_SS_THRESH_STR)) - .arg(Arg::with_name("ss_in_fold") - .long("ss_in_fold") - .help("Implement slow start in the datapath")) - .arg(Arg::with_name("report_per_ack") - .long("per_ack") - .help("Specifies that the datapath should send a measurement upon every ACK")) - .arg(Arg::with_name("report_per_interval") - .long("report_interval_ms") - .short("i") - .takes_value(true)) - .group(clap::ArgGroup::with_name("interval") - .args(&["report_per_ack", "report_per_interval"]) - .required(false)) - .arg(Arg::with_name("compensate_update") - .long("compensate_update") - .help("Scale the congestion window update during slow start to compensate for reporting delay")) - .arg(Arg::with_name("deficit_timeout") - .long("deficit_timeout") - .default_value("0") - .help("Number of RTTs to wait after a loss event to allow further CWND reductions. \ - Default 0 means CWND deficit counting is enforced strictly with no timeout.")) - .args(&A::args()) - } - - fn with_arg_matches( - args: &clap::ArgMatches, - logger: Option, - ) -> Result { - Ok(Self { - ss_thresh: u32::from_str_radix(args.value_of("ss_thresh").unwrap(), 10)?, - init_cwnd: u32::from_str_radix(args.value_of("init_cwnd").unwrap(), 10)?, - report_option: if args.is_present("report_per_ack") { - GenericCongAvoidConfigReport::Ack - } else if args.is_present("report_per_interval") { - GenericCongAvoidConfigReport::Interval(time::Duration::milliseconds( - args.value_of("report_per_interval") - .unwrap() - .parse() - .unwrap(), - )) - } else { - GenericCongAvoidConfigReport::Rtt - }, - ss: if args.is_present("ss_in_fold") { - GenericCongAvoidConfigSS::Datapath - } else { - GenericCongAvoidConfigSS::Ccp - }, - use_compensation: args.is_present("compensate_update"), - deficit_timeout: u32::from_str_radix(args.value_of("deficit_timeout").unwrap(), 10)?, - logger: logger.into(), - alg: A::with_args(args), - }) - } -} - -pub struct Flow { - alg: A, - deficit_timeout: u32, - init_cwnd: u32, - report_option: GenericCongAvoidConfigReport, - ss_thresh: u32, - use_compensation: bool, - control_channel: Datapath, - logger: Option, - - curr_cwnd_reduction: u32, - last_cwnd_reduction: time::Timespec, - - in_startup: bool, - mss: u32, - rtt: u32, - sc: Scope, -} - -impl portus::Flow for Flow { - fn on_report(&mut self, sock_id: u32, m: Report) { - let mut ms = self.get_fields(&m); - self.logger.as_ref().map(|log| { - debug!(log, "got ack"; - "acked(pkts)" => ms.acked / self.mss, - "curr_cwnd (pkts)" => self.alg.curr_cwnd() / self.mss, - "inflight (pkts)" => ms.inflight / self.mss, - "loss" => ms.loss, - "ssthresh" => self.ss_thresh, - "rtt" => ms.rtt, - "sid" => sock_id, - ); - }); - - if self.in_startup { - // install new fold - match self.report_option { - GenericCongAvoidConfigReport::Ack => { - self.sc = self.install_ack_update(); - } - GenericCongAvoidConfigReport::Rtt => { - self.sc = self.install_datapath_interval_rtt(); - } - GenericCongAvoidConfigReport::Interval(i) => { - self.sc = self.install_datapath_interval(i); - } - } - - self.alg.set_cwnd(ms.inflight); // * self.mss); - self.in_startup = false; - } - - self.rtt = ms.rtt; - if ms.was_timeout { - self.handle_timeout(); - return; - } - - ms.acked = self.slow_start_increase(ms.acked); - - // increase the cwnd corresponding to new in-order cumulative ACKs - self.alg.increase(&ms); - self.maybe_reduce_cwnd(&ms); - if self.curr_cwnd_reduction > 0 { - //self.logger.as_ref().map(|log| { - // debug!(log, "cwnd re"; - // "cwnd" => self.alg.curr_cwnd() / self.mss, - // "acked" => ms.acked / self.mss, - // "deficit" => self.curr_cwnd_reduction); - //}); - return; - } - - self.update_cwnd(); - } -} - -impl Flow { - /// Make no updates in the datapath, and send a report after an interval - fn install_datapath_interval(&mut self, interval: time::Duration) -> Scope { - self.control_channel - .set_program( - "DatapathIntervalProg", - Some(&[("reportTime", interval.num_microseconds().unwrap() as u32)][..]), - ) - .unwrap() - } - - /// Make no updates in the datapath, and send a report after each RTT - fn install_datapath_interval_rtt(&mut self) -> Scope { - self.control_channel - .set_program("DatapathIntervalRTTProg", None) - .unwrap() - } - - /// Make no updates in the datapath, but send a report on every ack. - fn install_ack_update(&mut self) -> Scope { - self.control_channel - .set_program("AckUpdateProg", None) - .unwrap() - } - - /// Don't update acked, since those acks are already accounted for in slow start. - /// Send a report once there is a drop or timeout. - fn install_ss_update(&mut self) -> Scope { - self.control_channel - .set_program("SSUpdateProg", None) - .unwrap() - } - - fn update_cwnd(&self) { - if let Err(e) = self - .control_channel - .update_field(&self.sc, &[("Cwnd", self.alg.curr_cwnd())]) - { - self.logger.as_ref().map(|log| { - warn!(log, "Cwnd update error"; - "err" => ?e, - ); - }); - } - } - - fn get_fields(&mut self, m: &Report) -> GenericCongAvoidMeasurements { - let sc = &self.sc; - let ack = m - .get_field(&String::from("Report.acked"), sc) - .expect("expected acked field in returned measurement") as u32; - - let sack = m - .get_field(&String::from("Report.sacked"), sc) - .expect("expected sacked field in returned measurement") as u32; - - let was_timeout = - m.get_field(&String::from("Report.timeout"), sc) - .expect("expected timeout field in returned measurement") as u32; - - let inflight = - m.get_field(&String::from("Report.inflight"), sc) - .expect("expected inflight field in returned measurement") as u32; - - let loss = m - .get_field(&String::from("Report.loss"), sc) - .expect("expected loss field in returned measurement") as u32; - - let rtt = m - .get_field(&String::from("Report.rtt"), sc) - .expect("expected rtt field in returned measurement") as u32; - - GenericCongAvoidMeasurements { - acked: ack, - was_timeout: was_timeout == 1, - sacked: sack, - loss, - rtt, - inflight, - } - } - - fn handle_timeout(&mut self) { - self.ss_thresh /= 2; - if self.ss_thresh < self.init_cwnd { - self.ss_thresh = self.init_cwnd; - } - - self.alg.reset(); - self.alg.set_cwnd(self.init_cwnd); - self.curr_cwnd_reduction = 0; - - self.logger.as_ref().map(|log| { - warn!(log, "timeout"; - "curr_cwnd (pkts)" => self.init_cwnd / self.mss, - "ssthresh" => self.ss_thresh, - ); - }); - - self.update_cwnd(); - return; - } - - fn maybe_reduce_cwnd(&mut self, m: &GenericCongAvoidMeasurements) { - let old_deficit = self.curr_cwnd_reduction; - if m.loss > 0 || m.sacked > 0 { - if self.deficit_timeout > 0 - && ((time::now().to_timespec() - self.last_cwnd_reduction) - > time::Duration::microseconds( - (f64::from(self.rtt) * self.deficit_timeout as f64) as i64, - )) - { - self.curr_cwnd_reduction = 0; - } - - // if loss indicator is nonzero - // AND the losses in the lossy cwnd have not yet been accounted for - // OR there is a partial ACK AND cwnd was probing ss_thresh - if m.loss > 0 && self.curr_cwnd_reduction == 0 - //|| (m.acked > 0 && self.alg.curr_cwnd() == self.ss_thresh) - { - //self.logger.as_ref().map(|log| { - // info!(log, "reduction"; - // "loss" => m.loss, - // "deficit" => self.curr_cwnd_reduction, - // "sacked" => m.sacked, - // "acked" => m.acked, - // "cwnd" => self.alg.curr_cwnd(), - // "ssthresh" => self.ss_thresh, - // ); - //}); - self.alg.reduction(m); - self.last_cwnd_reduction = time::now().to_timespec(); - self.ss_thresh = self.alg.curr_cwnd(); - self.update_cwnd(); - } - - self.curr_cwnd_reduction += m.sacked + m.loss; - } else if m.acked < self.curr_cwnd_reduction { - self.curr_cwnd_reduction -= (m.acked as f32 / self.mss as f32) as u32; - } else { - self.curr_cwnd_reduction = 0; - } - if old_deficit > 0 || self.curr_cwnd_reduction > 0 { - //self.logger.as_ref().map(|log| { - // info!(log, "deficit"; - // "old" => old_deficit, - // "new" => self.curr_cwnd_reduction, - // ); - //}); - } - } - - fn slow_start_increase(&mut self, acked: u32) -> u32 { - let mut new_bytes_acked = acked; - if self.alg.curr_cwnd() < self.ss_thresh { - // increase cwnd by 1 per packet, until ssthresh - if self.alg.curr_cwnd() + new_bytes_acked > self.ss_thresh { - new_bytes_acked -= self.ss_thresh - self.alg.curr_cwnd(); - self.alg.set_cwnd(self.ss_thresh); - } else { - let curr_cwnd = self.alg.curr_cwnd(); - if self.use_compensation { - // use a compensating increase function: deliberately overshoot - // the "correct" update to keep account for lost throughput due to - // infrequent updates. Usually this doesn't matter, but it can when - // the window is increasing exponentially (slow start). - let delta = f64::from(new_bytes_acked) / (2.0_f64).ln(); - self.alg.set_cwnd(curr_cwnd + delta as u32); - // let ccp_rtt = (rtt_us + 10_000) as f64; - // let delta = ccp_rtt * ccp_rtt / (rtt_us as f64 * rtt_us as f64); - // self.cwnd += (new_bytes_acked as f64 * delta) as u32; - } else { - self.alg.set_cwnd(curr_cwnd + new_bytes_acked); - } - - new_bytes_acked = 0 - } - } - - new_bytes_acked - } -} diff --git a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/reno.rs b/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/reno.rs deleted file mode 100644 index 3dfa735a4..000000000 --- a/quic/congestion_control/third_party/ccp/ccp_generic_cong_avoid/src/reno.rs +++ /dev/null @@ -1,55 +0,0 @@ -extern crate portus_export; -extern crate slog; - -pub use crate::GenericCongAvoidAlg; -pub use crate::GenericCongAvoidFlow; -pub use crate::GenericCongAvoidMeasurements; - -#[derive(Default)] -pub struct Reno { - mss: u32, - init_cwnd: f64, - cwnd: f64, -} - -impl GenericCongAvoidAlg for Reno { - type Flow = Self; - - fn name() -> &'static str { - "reno" - } - - fn with_args(_: &clap::ArgMatches) -> Self { - Default::default() - } - - fn new_flow(&self, _logger: Option, init_cwnd: u32, mss: u32) -> Self::Flow { - Reno { - mss, - init_cwnd: f64::from(init_cwnd), - cwnd: f64::from(init_cwnd), - } - } -} - -impl GenericCongAvoidFlow for Reno { - fn curr_cwnd(&self) -> u32 { - self.cwnd as u32 - } - - fn set_cwnd(&mut self, cwnd: u32) { - self.cwnd = f64::from(cwnd); - } - - fn increase(&mut self, m: &GenericCongAvoidMeasurements) { - // increase cwnd by 1 / cwnd per packet - self.cwnd += 3.0 * f64::from(self.mss) * (f64::from(m.acked) / self.cwnd); - } - - fn reduction(&mut self, _m: &GenericCongAvoidMeasurements) { - self.cwnd /= 2.0; - if self.cwnd <= self.init_cwnd { - self.cwnd = self.init_cwnd; - } - } -} diff --git a/quic/congestion_control/third_party/ccp/lib.rs b/quic/congestion_control/third_party/ccp/lib.rs deleted file mode 100644 index 2ba6f0952..000000000 --- a/quic/congestion_control/third_party/ccp/lib.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Step 1/2: Add new alg crates here. -// -// NOTE: Must be listed in TARGETS deps -extern crate ccp_const; -extern crate ccp_generic_cong_avoid; - -extern crate clap; -extern crate portus; - -use libc::c_char; -use portus::ipc::unix::Socket as S; -use portus::ipc::Blocking as B; -use portus::{CongAlg, CongAlgBuilder}; -use std::sync::{atomic, Arc}; -use std::ffi::CStr; -use std::fs::File; -use std::os::unix::io::FromRawFd; - -use ccp_generic_cong_avoid::{cubic::Cubic, reno::Reno}; - -fn _start(args: String, out: Option, uid: u64, handle: *const atomic::AtomicBool) { - // Parse config arguments - let argv = args.split_whitespace(); - let alg_name = argv.clone().next().expect("empty argument string"); - let log = out.map_or_else( - || portus::algs::make_logger(), - |f| portus::algs::make_file_logger(f), - ); - - let portus_bindaddr = format!("{}/{}", uid.to_string(), "portus"); - let backend = S::::new(&portus_bindaddr) - .map(|sk| portus::ipc::BackendBuilder { sock : sk }) - .expect("ipc initialization"); - - let mut available_algs = vec![]; - - macro_rules! register_alg { - ($alg:ident) => { - let reg_name = <$alg::__ccp_alg_export as CongAlg>>::name(); - available_algs.push(reg_name); - if alg_name == reg_name { - let args = $alg::__ccp_alg_export::args(); - let matches = args.get_matches_from(argv); - let alg = - $alg::__ccp_alg_export::with_arg_matches(&matches, Some(log.clone())).unwrap(); - portus::run_with_handle::<_, _>(backend, portus::Config { logger : Some(log) }, alg, handle).unwrap(); - return; - } - }; - ($pkg:ident, $base:ident, $alg:ident) => { - let reg_name = <$pkg::$base<$alg> as CongAlg>>::name(); - available_algs.push(reg_name); - if alg_name == reg_name { - let args = <$pkg::$base<$alg> as CongAlgBuilder<'_, '_>>::args(); - let matches = args.get_matches_from(argv); - let alg = <$pkg::$base<$alg> as CongAlgBuilder<'_, '_>>::with_arg_matches( - &matches, - Some(log.clone()), - ) - .unwrap(); - portus::run_with_handle::<_, _>(backend, portus::Config { logger : Some(log) }, alg, handle).unwrap(); - return; - } - }; - } - - register_alg!(ccp_const); - register_alg!(ccp_generic_cong_avoid, Alg, Reno); - register_alg!(ccp_generic_cong_avoid, Alg, Cubic); - - unreachable!("error: algorithm '{}' not found! available algorithms: {:#?}", alg_name, available_algs); -} - -#[no_mangle] -pub extern "C" fn ccp_create_handle() -> *const atomic::AtomicBool { - let handle = Arc::new(atomic::AtomicBool::new(true)); - println!("{:#?}", handle); - - return Arc::into_raw(handle) -} - -#[no_mangle] -pub extern "C" fn ccp_spawn(c_args: *const c_char, log_fd: i32, uid: u64, handle: *const atomic::AtomicBool) { - let args = unsafe { CStr::from_ptr(c_args) } - .to_string_lossy() - .into_owned(); - let f = unsafe { File::from_raw_fd(log_fd) }; - _start(args, Some(f), uid, handle) -} - -#[no_mangle] -pub extern "C" fn ccp_kill(handle: *const atomic::AtomicBool) { - let a = unsafe { Arc::from_raw(handle) }; - a.store(false, std::sync::atomic::Ordering::SeqCst); - // forget so we don't drop it automatically at the end, calling code has ownership - let _ = Arc::into_raw(a); -} diff --git a/quic/congestion_control/third_party/ccp/libstartccp.h b/quic/congestion_control/third_party/ccp/libstartccp.h deleted file mode 100644 index 1512fdced..000000000 --- a/quic/congestion_control/third_party/ccp/libstartccp.h +++ /dev/null @@ -1,7 +0,0 @@ -#include - -extern "C" { -bool *ccp_create_handle(); -void ccp_spawn(const char* args, uint32_t log_fd, uint64_t uid, bool *handle); -void ccp_kill(bool *handle); -} diff --git a/quic/server/CCPReader.cpp b/quic/server/CCPReader.cpp deleted file mode 100644 index 21d624992..000000000 --- a/quic/server/CCPReader.cpp +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#include - -#include -#include -#include - -#ifdef CCP_ENABLED -#include -#include -#endif - -#include - -#define CCP_UNIX_BASE "/ccp/" -#define FROM_CCP "mvfst" -#define TO_CCP "portus" -#define CCP_MAX_MSG_SIZE 32678 - -namespace quic { - -/** - * In order to keep logic simple for callers of CCPReader (eg. QuicServer), this - * implementation internalizes checks for whether or not ccp is enabled. If ccp - * is enabled, the functions are defined as expected. If not, the functions are - * just empty stubs that ignore their args and do nothing (see bottom). - */ -#ifdef CCP_ENABLED - -/* - * libccp callbacks - * - * These functions are not called by CCPReader directly. We pass a reference of - * them to libccp so that it has a simple interface to modify state (eg. cwnd) - * within the datapath as necessary. - */ -extern "C" { - -/** - * In libccp, all time is relative to process start. - * NOTE: This is a global variable (as opposed to a field in CCPReader), because - * the time functions, such as libccp::now, are stateless and take no - * parameters. - * TODO: in a future version, libccp should pass a pointer to the datapath - * struct, so we can recover the corresponding CCPReader and keep this variable - * as a field there instead. - */ -uint64_t init_time_ns = 0; - -void _ccp_set_cwnd(struct ccp_connection* conn, uint32_t cwnd) { - // Look up the CCP cc algorithm instance for this connection - CCP* alg = (CCP*)ccp_get_impl(conn); - alg->setCongestionWindow(cwnd); -} - -void _ccp_set_rate_abs(struct ccp_connection* conn, uint32_t rate) { - CCP* alg = (CCP*)ccp_get_impl(conn); - alg->setPacingRate(rate); -} - -int _ccp_send_msg(struct ccp_connection* conn, char* msg, int msg_size) { - std::unique_ptr buf = folly::IOBuf::wrapBuffer(msg, msg_size); - // Since this function is called by libccp, we are not within the CCPReader - // and thus need to look up a reference to it. This is stored within the - // datapath object created in CCPReader::try_initialize. - CCPReader* reader = (CCPReader*)conn->datapath->impl; - auto ret = reader->writeOwnedBuffer(std::move(buf)); - if (ret < 0) { - LOG(ERROR) << "ccp_send_msg failed ret=" << ret << " errno=" << errno; - } - return ret; -} - -static void _ccp_log( - struct ccp_datapath* /*dp*/, - enum ccp_log_level level, - const char* msg, - int /*msg_size*/) { - LOG(ERROR) << "[libccp." << level << "] " << msg; -} - -// The next 3 functions are used to provide libccp with a sense of time. -// It uses these functions to implement timers, so the timers can only be -// as accurate as the notion of time provided here. -uint64_t _ccp_now_usecs() { - struct timespec now; - uint64_t now_ns, now_us; - - clock_gettime(CLOCK_MONOTONIC, &now); - - now_ns = (1000000000L * now.tv_sec) + now.tv_nsec; - if (init_time_ns == 0) { - init_time_ns = now_ns; - } - - now_us = ((now_ns - init_time_ns) / 1000) & 0xffffffff; - return now_us; -} - -uint64_t _ccp_since_usecs(uint64_t then) { - return _ccp_now_usecs() - then; -} - -uint64_t _ccp_after_usecs(uint64_t usecs) { - return _ccp_now_usecs() + usecs; -} -} // end libccp callbacks - -CCPReader::CCPReader() = default; - -void CCPReader::try_initialize( - folly::EventBase* evb, - uint64_t ccpId, - uint64_t parentServerId, - uint8_t parentWorkerId) { - evb_ = evb; - ccpId_ = ccpId; - serverId_ = parentServerId; - workerId_ = parentWorkerId; - - // Even though it is technically called an Async*UDP*Socket by folly, - // this is really a unix socket! - ccpSocket_ = std::make_unique(evb_); - - bind(); - - // libccp asks us to allocate this struct manually (as opposed to allocating - // it internally as it does for other things), to allow the max connections to - // be configurable. This array is used by libccp to keep track of all the - // connections that are currently active. The index is the connection id. - struct ccp_connection* active_connections = (struct ccp_connection*)calloc( - MAX_CONCURRENT_CONNECTIONS_LIBCCP, sizeof(struct ccp_connection)); - if (active_connections == nullptr) { - LOG(ERROR) << "[ccp] failed to allocate per-connection data structure"; - return; - } - - // Giving libccp a reference to all of the callbacks - ccpDatapath_.set_cwnd = &_ccp_set_cwnd; - ccpDatapath_.set_rate_abs = &_ccp_set_rate_abs; - ccpDatapath_.send_msg = &_ccp_send_msg; - ccpDatapath_.log = &_ccp_log; - ccpDatapath_.now = &_ccp_now_usecs; - ccpDatapath_.since_usecs = &_ccp_since_usecs; - ccpDatapath_.after_usecs = &_ccp_after_usecs; - - ccpDatapath_.impl = (void*)this; - ccpDatapath_.ccp_active_connections = active_connections; - ccpDatapath_.max_connections = MAX_CONCURRENT_CONNECTIONS_LIBCCP; - ccpDatapath_.max_programs = MAX_DATAPATH_PROGRAMS_LIBCCP; - ccpDatapath_.fto_us = FALLBACK_TIMEOUT_US_LIBCCP; - - // This function registers us (QuicServerWorker+CCPReader) as a "datapath" - // within libccp. Libccp itself is stateless, so all of our state is - // maintained in this struct and thus must be passed to any future calls to - // libccp. - int ret = ccp_init(&ccpDatapath_); - if (ret < 0) { - LOG(ERROR) << "[ccp] ccp_init failed ret=" << ret; - throw std::runtime_error("internal bug: unable to interface with libccp"); - } -} - -int CCPReader::connect() { - // This message registers us with CCP. CCP ignores messages from - // datapaths that have not yet registered with it. It identifies a datapath - // by the sending address (/ccp/mvfst{id}). - char ready_buf[READY_MSG_SIZE]; - int wrote = write_ready_msg(ready_buf, READY_MSG_SIZE, workerId_); - std::unique_ptr ready_msg = - folly::IOBuf::wrapBuffer(ready_buf, wrote); - int ret = writeOwnedBuffer(std::move(ready_msg)); - // Since we start ccp within a separate thread from QuicServer, its possible - // that it hasn't been scheduled yet when we send this message, in which case - // we will get an unable to connet to socket error (111). If this happens, - // we return a failure and expected the caller to wait and retry later. - // If we can't connect after a few tries then something is probably wrong - // with CCP and we should give up. - if (ret < 0) { - LOG(ERROR) << "[ccp] write_ready_msg failed ret=" << ret - << " errno=" << errno; - } else { - LOG(INFO) << "[ccp] write_ready_msg success"; - initialized_ = true; - } - return ret; -} - -folly::EventBase* CCPReader::getEventBase() const { - return evb_; -} - -/** - * We communicate with CCP via unix domain sockets. - * CCP owns the address /ccp/portus - * Each QuicServerWorker has its own address, distinguished by worker id: - * ccp/mvfst{id} - */ -void CCPReader::bind() { - CHECK(ccpSocket_); - - // In order to prevent permission issues in production environments, we use - // "abstract" sockets, which do not actually create an entry in the - // filesystem. An abstract socket is created by prepending a nullbyte to the - // desired path name. FYI, when displayed by system utilities, abstract socket - // names appear with an @ as the first character to denote the null byte. - std::string recvPath(1, 0); - recvPath.append(CCP_UNIX_BASE); - recvPath.append( - std::to_string(ccpId_) + "/" + std::to_string(serverId_) + "/"); - recvPath.append(FROM_CCP); - recvPath.append(std::to_string(workerId_)); - recvAddr_.setFromPath(recvPath); - ccpSocket_->bind(recvAddr_); - - // Again we start the address with a null byte here to make it abstract - // Since these are connectionless sockets, we don't need to connect to this - // address. - std::string sendPath(1, 0); - sendPath.append(CCP_UNIX_BASE); - sendPath.append(std::to_string(ccpId_) + "/"); - sendPath.append(TO_CCP); - sendAddr_.setFromPath(sendPath); -} - -void CCPReader::start() { - ccpSocket_->resumeRead(this); -} - -void CCPReader::pauseRead() { - CHECK(ccpSocket_); - ccpSocket_->pauseRead(); -} - -void CCPReader::getReadBuffer(void** buf, size_t* len) noexcept { - // TODO should this be initialized once in the constructor and re-used here? - readBuffer_ = folly::IOBuf::create(CCP_MAX_MSG_SIZE); - *buf = readBuffer_->writableData(); - *len = CCP_MAX_MSG_SIZE; -} - -void CCPReader::onDataAvailable( - const folly::SocketAddress& /*client*/, - size_t len, - bool truncated, - OnDataAvailableParams /*params*/) noexcept { - // TODO if read buffer is re-used, shouldn't move it here - // Move readBuffer_ first to get rid of it immediately so that if we return - // early, we've flushed it. - Buf data = std::move(readBuffer_); - if (truncated || len <= 0) { - // This is an error, drop the packet. - return; - } - - // TODO is this needed? - // data->append(len); - char* buf = (char*)data->data(); - int ret = ccp_read_msg(&ccpDatapath_, buf, len); - - // After a connection ends, we may get 1 more message from ccp about it, - // but we've already removed it from our local list of connections, - // so libccp will return LIBCCP_UNKNOWN_CONNECTION, which we can ignore - if (ret < 0 && ret != LIBCCP_UNKNOWN_CONNECTION) { - LOG(ERROR) << "ccp_read_msg failed ret=" << ret; - } -} - -void CCPReader::onReadError(const folly::AsyncSocketException& ex) noexcept { - LOG(ERROR) << "ccpReader onReadError: " << ex.what(); -} - -void CCPReader::onReadClosed() noexcept { - shutdown(); -} - -ssize_t CCPReader::writeOwnedBuffer(std::unique_ptr buf) { - return ccpSocket_->write(sendAddr_, buf); -} - -uint8_t CCPReader::getWorkerId() const noexcept { - return workerId_; -} - -struct ccp_datapath* CCPReader::getDatapath() noexcept { - return initialized_ ? &ccpDatapath_ : nullptr; -} - -void CCPReader::shutdown() { - if (ccpSocket_) { - try { - ccpSocket_->pauseRead(); - ccpSocket_->close(); - } catch (...) { - } - ccpSocket_ = nullptr; - } - if (ccpDatapath_.ccp_active_connections) { - free(ccpDatapath_.ccp_active_connections); - } - ccp_free(&ccpDatapath_); -} - -CCPReader::~CCPReader() { - // We allocated the list of active connections and ccp_datapath struct, - // so we are responsible for freeing them - shutdown(); -} - -#else // Empty method placeholders for when ccp is not enabled: - -CCPReader::CCPReader() = default; -void CCPReader::try_initialize( - folly::EventBase* evb, - uint64_t, - uint64_t, - uint8_t) { - evb_ = evb; -} -int CCPReader::connect() { - return 0; -} -folly::EventBase* CCPReader::getEventBase() const { - return evb_; -} -void CCPReader::bind() {} -void CCPReader::start() {} -void CCPReader::pauseRead() {} -void CCPReader::getReadBuffer(void**, size_t*) noexcept {} -void CCPReader::onDataAvailable( - const folly::SocketAddress&, - size_t, - bool, - OnDataAvailableParams) noexcept {} -void CCPReader::onReadError(const folly::AsyncSocketException&) noexcept {} -void CCPReader::onReadClosed() noexcept {} -ssize_t CCPReader::writeOwnedBuffer(std::unique_ptr) { - return 0; -} -uint8_t CCPReader::getWorkerId() const noexcept { - return workerId_; -} -void CCPReader::shutdown() {} -CCPReader::~CCPReader() = default; - -#endif - -} // namespace quic diff --git a/quic/server/CCPReader.h b/quic/server/CCPReader.h deleted file mode 100644 index 1efe50eed..000000000 --- a/quic/server/CCPReader.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#pragma once - -#include -#include - -#ifdef CCP_ENABLED -#include -#endif - -/** - * Libccp internally allocates a fixed-size array to manage state for active - * connections. This number defines the size of that array and thus the maximum - * number of connections we expect to be active at any one time. Once exceeded, - * ccp will ignore new connections. - * TODO add error handling for this case - */ -#define MAX_CONCURRENT_CONNECTIONS_LIBCCP 1024 -/** - * This is the maximum number of datapath programs a ccp algorithm may - * install. 10 should be sufficient, the average algorithm only needs to - * install 1, but more complicated algorithms may use a few. - */ -#define MAX_DATAPATH_PROGRAMS_LIBCCP 10 -/** - * - * This is the fallback timeout length, in microseconds. If libccp hasn't - * received a response from CCP within this amount of time, it enters fallback - * mode, and all future calls to libccp return an error until connection with - * ccp is restored. This logic is handled within QuicCCP. Note that libccp - * checks for this timeout using the time callbacks above, so its ability to - * achieve this value exactly is limited by the resolution of those functions. - */ -#define FALLBACK_TIMEOUT_US_LIBCCP 1000000 - -namespace quic { - -/** - * Each instance of QuicServerWorker is considered a separate "datapath" by CCP - * and thus communicates with CCP independently in order to prevent the need for - * any coordination between workers. - * - * Each worker has its own corresponding instance of CCPReader to handle - * recieving messages from CCP and dispatching them to the correct connection - * (the cc algorithm implementation, QuicCCP, handles sending messages to CCP). - * This CCPReader runs in the same event base as the worker. - * - * The current implementation assumes communication with CCP via unix sockets. - * CCP can be configured to use other IPC mechanisms if necessary. - * - */ -class CCPReader : public folly::AsyncUDPSocket::ReadCallback { - public: - explicit CCPReader(); - ~CCPReader() override; - - // Initialize state (with libccp) and send a ready message to CCP - void try_initialize( - folly::EventBase* evb, - uint64_t ccpId, - uint64_t serverId, - uint8_t workerId); - // Start listening on the socket - void start(); - // Send a ready message to CCP - int connect(); - // Pause listening on the socket - void pauseRead(); - - // Standard ReadCallback functions - void getReadBuffer(void** buf, size_t* len) noexcept override; - void onDataAvailable( - const folly::SocketAddress& client, - size_t len, - bool truncated, - OnDataAvailableParams params) noexcept override; - void onReadError(const folly::AsyncSocketException& ex) noexcept override; - void onReadClosed() noexcept override; - - // The id of the corresponding worker thread (running in the same evb) - FOLLY_NODISCARD uint8_t getWorkerId() const noexcept; - - FOLLY_NODISCARD folly::EventBase* getEventBase() const; - // Send a message to CCP at the unix socket /ccp/portus from our address - // /ccp/mvfst{id} - ssize_t writeOwnedBuffer(std::unique_ptr buf); - - // Get a reference to the corresponding ccp_datapath struct - // (libccp's wrapper around a QuicServerWorker), needed - // for interacting with libccp. This gets passed to CCP alg instances - // by the QuicServerConnectionState object. -#ifdef CCP_ENABLED - struct ccp_datapath* FOLLY_NULLABLE getDatapath() noexcept; -#endif - - void shutdown(); - - private: - // Bind to our unix socket address - void bind(); - - // Each ccp reader is uniquely identified by its parent - // QuicServer/QuicServerWorker pair - uint8_t workerId_; - folly::SocketAddress sendAddr_; - folly::SocketAddress recvAddr_; - std::unique_ptr ccpSocket_; - folly::EventBase* evb_{nullptr}; - std::unique_ptr readBuffer_; -#ifdef CCP_ENABLED - struct ccp_datapath ccpDatapath_; -#endif -}; - -} // namespace quic diff --git a/quic/server/CMakeLists.txt b/quic/server/CMakeLists.txt index a011a5411..600bc7451 100644 --- a/quic/server/CMakeLists.txt +++ b/quic/server/CMakeLists.txt @@ -45,8 +45,6 @@ add_library( QuicServerPacketRouter.cpp QuicServerTransport.cpp QuicServerWorker.cpp - CCPReader.cpp - QuicCcpThreadLauncher.cpp SlidingWindowRateLimiter.cpp handshake/DefaultAppTokenValidator.cpp handshake/TokenGenerator.cpp @@ -120,18 +118,6 @@ add_dependencies( mvfst_state_ack_handler ) -if(DEFINED CCP_ENABLED) - # This target is generated by running cargo, so cmake needs to know where - # to find it. See quic/congestion_control/third_party/ccp/CMakeLists.txt - # When CCP is not enabled, LIBSTARTCCP_LIBRARY is empty, and thus ignored - # during linking. - get_target_property(LIBSTARTCCP_LIBRARY libstartccp LOCATION) - - # This target is only defined when CCP_ENABLED, so we can ony add it here. - add_dependencies(mvfst_server libstartccp) - add_dependencies(mvfst_server_state libstartccp) -endif() - target_link_libraries( mvfst_server_state PUBLIC Folly::folly @@ -146,8 +132,6 @@ target_link_libraries( mvfst_state_ack_handler mvfst_transport mvfst_transport_knobs - ${LIBCCP_LIBRARY} - ${LIBSTARTCCP_LIBRARY} ) target_link_libraries( @@ -166,8 +150,6 @@ target_link_libraries( mvfst_state_datagram_handler mvfst_transport mvfst_transport_knobs - ${LIBCCP_LIBRARY} - ${LIBSTARTCCP_LIBRARY} ) target_link_libraries( diff --git a/quic/server/QuicCcpThreadLauncher.cpp b/quic/server/QuicCcpThreadLauncher.cpp deleted file mode 100644 index 3dde4ba80..000000000 --- a/quic/server/QuicCcpThreadLauncher.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#include - -namespace quic { - -void QuicCcpThreadLauncher::start( - FOLLY_MAYBE_UNUSED const std::string& ccpConfig) { -#ifdef CCP_ENABLED - ccpId_ = folly::Random::secureRand64(); - handle_ = ccp_create_handle(); - ccpEvb_.getEventBase()->runInEventBaseThread([&, handle = handle_] { - // NOTE second arg is fd of output device - // hardcoded to 2 for stderr - ccp_spawn(ccpConfig.c_str(), 2, ccpId_, handle); - }); -#else - VLOG(2) << "WARN: tried to launch ccp, but ccp not enabled"; -#endif -} - -bool QuicCcpThreadLauncher::hasLaunched() { - return ccpId_ != 0; -} - -uint64_t QuicCcpThreadLauncher::getCcpId() { -#ifdef CCP_ENABLED - return ccpId_; -#else - return 0; -#endif -} - -void QuicCcpThreadLauncher::stop() { -#ifdef CCP_ENABLED - if (hasLaunched_()) { - ccp_kill(handle_); - } -#else -#endif -} -} // namespace quic diff --git a/quic/server/QuicCcpThreadLauncher.h b/quic/server/QuicCcpThreadLauncher.h deleted file mode 100644 index 6a667da60..000000000 --- a/quic/server/QuicCcpThreadLauncher.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#pragma once - -#ifdef CCP_ENABLED -#include -#endif -#include -#include -#include -#include - -namespace quic { - -class QuicCcpThreadLauncher { - public: - QuicCcpThreadLauncher() { - ccpId_ = 0; - } - - void start(const std::string& ccpConfig); - bool hasLaunched(); - uint64_t getCcpId(); - void stop(); - - private: -#ifdef CCP_ENABLED - folly::ScopedEventBaseThread ccpEvb_; -#endif - uint64_t ccpId_; -}; - -} // namespace quic diff --git a/quic/server/QuicServer.cpp b/quic/server/QuicServer.cpp index 08188f0e3..2fd826560 100644 --- a/quic/server/QuicServer.cpp +++ b/quic/server/QuicServer.cpp @@ -12,15 +12,11 @@ #include #include #include -#include -#ifdef CCP_ENABLED -#include -#endif -#include #include #include #include #include +#include FOLLY_GFLAGS_DEFINE_bool( qs_io_uring_use_async_recv, @@ -235,19 +231,12 @@ void QuicServer::bindWorkersToSocket( auto numWorkers = evbs.size(); CHECK(!initialized_); boundAddress_ = address; - auto usingCCP = isUsingCCP(); - if (!usingCCP) { - VLOG(2) << "NOT using CCP"; - } - auto ccpInitFailed = false; for (size_t i = 0; i < numWorkers; ++i) { auto workerEvb = evbs[i]; workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait( [self = this->shared_from_this(), workerEvb, numWorkers, - usingCCP, - &ccpInitFailed, processId = processId_, idx = i] { std::lock_guard guard(self->startMutex_); @@ -288,28 +277,6 @@ void QuicServer::bindWorkersToSocket( self->boundAddress_ = worker->getAddress(); } } - if (usingCCP) { - auto serverId = self->boundAddress_.getIPAddress().hash() | - self->boundAddress_.getPort(); - try { - worker->getCcpReader()->try_initialize( - worker->getEventBase(), - self->ccpId_, - serverId, - worker->getWorkerId()); - worker->getCcpReader()->connect(); - } catch (const folly::AsyncSocketException& ex) { - // probably means the unix socket failed to bind - LOG(ERROR) << "exception while initializing ccp: " << ex.what() - << "\nshutting down..."; - // TODO also update counters - ccpInitFailed = true; - } catch (const std::exception& ex) { - LOG(ERROR) << "exception initializing ccp: " << ex.what() - << "\nshutting down..."; - ccpInitFailed = true; - } - } if (idx == (numWorkers - 1)) { VLOG(4) << "Initialized all workers in the eventbase"; self->initialized_ = true; @@ -317,9 +284,6 @@ void QuicServer::bindWorkersToSocket( } }); } - if (usingCCP && ccpInitFailed) { - shutdown(); - } } void QuicServer::start() { @@ -331,14 +295,9 @@ void QuicServer::start() { workerPtr_.reset( worker, [](auto /* worker */, folly::TLPDestructionMode) {}); }); - auto usingCCP = isUsingCCP(); for (auto& worker : workers_) { - worker->getEventBase()->runInEventBaseThread([&worker, usingCCP] { - if (usingCCP) { - worker->getCcpReader()->start(); - } - worker->start(); - }); + worker->getEventBase()->runInEventBaseThread( + [&worker] { worker->start(); }); } } @@ -509,13 +468,9 @@ void QuicServer::shutdown(LocalErrorCode error) { return; } shutdown_ = true; - auto usingCCP = isUsingCCP(); for (auto& worker : workers_) { worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&] { worker->shutdownAllConnections(error); - if (usingCCP) { - worker->getCcpReader()->shutdown(); - } workerPtr_.reset(); }); // protecting the erase in map with the mutex since @@ -626,30 +581,6 @@ void QuicServer::setTransportSettings(TransportSettings transportSettings) { }); } -void QuicServer::setCcpId(uint64_t ccpId) { - ccpId_ = ccpId; -} - -bool QuicServer::isUsingCCP() { - auto foundId = ccpId_ != 0; -#ifdef CCP_ENABLED - auto default_is_ccp = transportSettings_.defaultCongestionController == - CongestionControlType::CCP; - if (foundId && default_is_ccp) { - return true; - } else { - return false; - } - return foundId; -#else - if (foundId) { - LOG(ERROR) - << "found ccp id, but server was not compiled with ccp support. recompile with -DCCP_ENABLED."; - } - return false; -#endif -} - void QuicServer::rejectNewConnections(std::function rejectFn) { rejectNewConnections_ = rejectFn; runOnAllWorkers([rejectFn](auto worker) mutable { diff --git a/quic/server/QuicServer.h b/quic/server/QuicServer.h index 459492685..5987515ff 100644 --- a/quic/server/QuicServer.h +++ b/quic/server/QuicServer.h @@ -179,14 +179,6 @@ class QuicServer : public QuicServerWorker::WorkerCallback, */ void setTransportSettings(TransportSettings transportSettings); - /** - * If the calling application wants to use CCP for CC, it's the - * app's responsibility to start an instance of CCP -- this ID - * refers to that unique instance of CCP so we (QuicServer) know - * how to connect to it. - */ - void setCcpId(uint64_t ccpId); - /** * Tells the server to start rejecting any new connection. The parameter * function is stored and evaluated on each new connection before being @@ -409,8 +401,6 @@ class QuicServer : public QuicServerWorker::WorkerCallback, QuicVersion::QUIC_V1_ALIAS, QuicVersion::QUIC_DRAFT}}; - bool isUsingCCP(); - std::atomic shutdown_{true}; std::shared_ptr ctx_; TransportSettings transportSettings_; @@ -470,14 +460,6 @@ class QuicServer : public QuicServerWorker::WorkerCallback, // Options to AsyncUDPSocket::bind, only controls IPV6_ONLY currently. folly::AsyncUDPSocket::BindOptions bindOptions_; -#ifdef CCP_ENABLED - std::unique_ptr ccpEvb_; -#endif - // Random number to uniquely identify this instance of quic to ccp - // in case there are multiple concurrent instances (e.g. when proxygen is - // migrating connections and there are two concurrent instances of proxygen) - uint64_t ccpId_{0}; - // set by getEventBaseBackend if multishot callback is // supprted bool backendSupportsMultishotCallback_{false}; diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 0e3503bf3..d8552759b 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -656,12 +656,6 @@ void QuicServerTransport::setBufAccessor(BufAccessor* bufAccessor) { conn_->bufAccessor = bufAccessor; } -#ifdef CCP_ENABLED -void QuicServerTransport::setCcpDatapath(struct ccp_datapath* datapath) { - serverConn_->ccpDatapath = datapath; -} -#endif - const std::shared_ptr QuicServerTransport::getPeerCertificate() const { const auto handshakeLayer = serverConn_->serverHandshakeLayer; @@ -721,16 +715,6 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() { if (cctype == server_conn->congestionController->type()) { return; } - if (cctype == CongestionControlType::CCP) { - bool ccpAvailable = false; -#ifdef CCP_ENABLED - ccpAvailable = server_conn->ccpDatapath != nullptr; -#endif - if (!ccpAvailable) { - LOG(ERROR) << "ccp not enabled on this server"; - return; - } - } server_conn->congestionController = server_conn->congestionControllerFactory->makeCongestionController( *server_conn, cctype); diff --git a/quic/server/QuicServerTransport.h b/quic/server/QuicServerTransport.h index ee4584a6f..d9ba2c963 100644 --- a/quic/server/QuicServerTransport.h +++ b/quic/server/QuicServerTransport.h @@ -151,15 +151,6 @@ class QuicServerTransport virtual void setBufAccessor(BufAccessor* bufAccessor); -#ifdef CCP_ENABLED - /* - * This function must be called with an initialized ccp_datapath (via - * libccp:ccp_init) before starting any connections using the CCP congestion - * control algorithm. See further notes on this struct in the header file. - */ - void setCcpDatapath(struct ccp_datapath* datapath); -#endif - const std::shared_ptr getPeerCertificate() const override; diff --git a/quic/server/QuicServerWorker.cpp b/quic/server/QuicServerWorker.cpp index 96180ea9a..de3b96f03 100644 --- a/quic/server/QuicServerWorker.cpp +++ b/quic/server/QuicServerWorker.cpp @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include @@ -58,7 +57,6 @@ QuicServerWorker::QuicServerWorker( setEventCallback_(ec), takeoverPktHandler_(this), observerList_(this) { - ccpReader_ = std::make_unique(); pending0RttData_.setPruneHook( [&](auto, auto) { QUIC_STATS(statsCallback_, onZeroRttBufferedPruned); }); } @@ -647,9 +645,6 @@ QuicServerTransport::Ptr QuicServerWorker::makeTransport( if (validNewToken) { trans->verifiedClientAddress(); } -#ifdef CCP_ENABLED - trans->setCcpDatapath(getCcpReader()->getDatapath()); -#endif trans->setCongestionControllerFactory(ccFactory_); trans->setTransportStatsCallback(statsCallback_.get()); // ok if nullptr @@ -1181,10 +1176,6 @@ void QuicServerWorker::setConnectionIdVersion( cidVersion_ = cidVersion; } -CCPReader* QuicServerWorker::getCcpReader() const noexcept { - return ccpReader_.get(); -} - void QuicServerWorker::setNewConnectionSocketFactory( QuicUDPSocketFactory* factory) { socketFactory_ = factory; diff --git a/quic/server/QuicServerWorker.h b/quic/server/QuicServerWorker.h index f9a1381a1..cf8fa73e1 100644 --- a/quic/server/QuicServerWorker.h +++ b/quic/server/QuicServerWorker.h @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -367,13 +366,6 @@ class QuicServerWorker : public folly::AsyncUDPSocket::ReadCallback, void setUnfinishedHandshakeLimit(std::function limitFn); - /* - * Get a reference to this worker's corresponding CCPReader. - * Each worker has a CCPReader that handles recieving messages from CCP - * and dispatching them to the correct connection. - */ - FOLLY_NODISCARD CCPReader* getCcpReader() const noexcept; - // Read callback void getReadBuffer(void** buf, size_t* len) noexcept override; @@ -722,8 +714,6 @@ class QuicServerWorker : public folly::AsyncUDPSocket::ReadCallback, // List of AcceptObservers AcceptObserverList observerList_; - std::unique_ptr ccpReader_; - TimePoint largestPacketReceiveTime_{TimePoint::min()}; }; diff --git a/quic/server/state/ServerStateMachine.h b/quic/server/state/ServerStateMachine.h index d3c9b01f2..962bbd16c 100644 --- a/quic/server/state/ServerStateMachine.h +++ b/quic/server/state/ServerStateMachine.h @@ -27,10 +27,6 @@ #include #include -#ifdef CCP_ENABLED -#include -#endif - #include #include #include @@ -153,16 +149,6 @@ struct QuicServerConnectionState : public QuicConnectionStateBase { // Sequence number of the last received MAX_PACING_RATE_KNOB_SEQUENCED. folly::Optional maybeLastMaxPacingRateKnobSeqNum{folly::none}; -#ifdef CCP_ENABLED - // Pointer to struct that maintains state needed for interacting with libccp. - // Once instance of this struct is created for each instance of - // QuicServerWorker (but lives in the worker's corresponding CCPReader). We - // need to store a pointer to it here, because it needs to be accessible by - // the QuicCCP congestion control algorithm, which only has access to the - // connection's QuicConnectionStateBase. - struct ccp_datapath* ccpDatapath; -#endif - folly::Optional createAndAddNewSelfConnId() override; QuicServerConnectionState( diff --git a/quic/state/TransportSettings.h b/quic/state/TransportSettings.h index aacb798d8..254d3c596 100644 --- a/quic/state/TransportSettings.h +++ b/quic/state/TransportSettings.h @@ -55,11 +55,6 @@ struct BbrConfig { folly::Optional ackFrequencyConfig; }; -struct CcpConfig { - std::string alg_name = ""; - std::string alg_args = ""; -}; - struct DatagramConfig { bool enabled{false}; bool framePerPacket{true}; @@ -209,8 +204,6 @@ struct TransportSettings { bool shouldUseRecvmmsgForBatchRecv{false}; // Config struct for BBR BbrConfig bbrConfig; - // Config struct for CCP - CcpConfig ccpConfig; // A packet is considered loss when a packet that's sent later by at least // timeReorderingThreshold * RTT is acked by peer. DurationRep timeReorderingThreshDividend{ diff --git a/quic/tools/tperf/tperf.cpp b/quic/tools/tperf/tperf.cpp index efff0cd23..ce5c2b270 100644 --- a/quic/tools/tperf/tperf.cpp +++ b/quic/tools/tperf/tperf.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -39,8 +38,7 @@ DEFINE_uint64( DEFINE_uint64(writes_per_loop, 44, "Amount of socket writes per event loop"); DEFINE_uint64(window, 1024 * 1024, "Flow control window size"); DEFINE_bool(autotune_window, true, "Automatically increase the receive window"); -DEFINE_string(congestion, "cubic", "newreno/cubic/bbr/ccp/none"); -DEFINE_string(ccp_config, "", "Additional args to pass to ccp"); +DEFINE_string(congestion, "cubic", "newreno/cubic/bbr/none"); DEFINE_bool(pacing, false, "Enable pacing"); DEFINE_uint64( max_pacing_rate, @@ -138,8 +136,8 @@ class TPerfObserver : public LegacyObserver { }; /** - * A helper accpetor observer that installs life cycle observers to - * transport upon accpet + * A helper acceptor observer that installs life cycle observers to + * transport upon accept */ class TPerfAcceptObserver : public AcceptObserver { public: @@ -483,16 +481,6 @@ class TPerfServer { server_->setCongestionControllerFactory( std::make_shared()); server_->setTransportSettings(settings); - - if (congestionControlType == quic::CongestionControlType::CCP) { -#ifdef CCP_ENABLED - quicCcpThreadLauncher_.start(FLAGS_ccp_config); - server_->setCcpId(quicCcpThreadLauncher_.getCcpId()); -#else - LOG(ERROR) - << "To use CCP you must recompile tperf and all dependencies with -DCCP_ENABLED"; -#endif - } } void start() { @@ -514,7 +502,6 @@ class TPerfServer { folly::EventBase eventBase_; std::unique_ptr acceptObserver_; std::shared_ptr server_; - quic::QuicCcpThreadLauncher quicCcpThreadLauncher_; }; class TPerfClient : public quic::QuicSocket::ConnectionSetupCallback,