1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-04-18 17:24:03 +03:00

Remove libccp from mvfst

Summary: We don't use it, and the OSS lib hasn't been updated in a while.

Reviewed By: mjoras

Differential Revision: D46707559

fbshipit-source-id: ec102a52183a736cfb1c0241600816a837062108
This commit is contained in:
Konstantin Tsoy 2023-06-15 18:17:53 -07:00 committed by Facebook GitHub Bot
parent dccfc706b5
commit adf16f9a07
37 changed files with 7 additions and 2871 deletions

View File

@ -45,16 +45,6 @@ find_package(Fizz REQUIRED)
find_package(Glog REQUIRED) find_package(Glog REQUIRED)
find_package(Threads) find_package(Threads)
if(DEFINED CCP_ENABLED)
find_package(libccp REQUIRED)
set(LIBCCP_LIBRARY libccp::ccp_static)
# mvfst implementation of ccp is guarded with ifdef CCP_ENABLED
add_compile_definitions(CCP_ENABLED=${CCP_ENABLED})
# libccp header files include extern c if __CPLUSPLUS__ is defined
add_compile_definitions(__CPLUSPLUS__=1)
endif()
SET(GFLAG_DEPENDENCIES "") SET(GFLAG_DEPENDENCIES "")
SET(QUIC_EXTRA_LINK_LIBRARIES "") SET(QUIC_EXTRA_LINK_LIBRARIES "")
SET(QUIC_EXTRA_INCLUDE_DIRECTORIES "") SET(QUIC_EXTRA_INCLUDE_DIRECTORIES "")

View File

@ -25,14 +25,13 @@ Usage ${0##*/} [-h|?] [-p PATH] [-i INSTALL_PREFIX]
-m (optional): Build folly without jemalloc -m (optional): Build folly without jemalloc
-s (optional): Skip installing system package dependencies -s (optional): Skip installing system package dependencies
-c (optional): Use ccache -c (optional): Use ccache
-z (optional): enable CCP support
-f (optional): Skip fetching dependencies (to test local changes) -f (optional): Skip fetching dependencies (to test local changes)
-h|? Show this help message -h|? Show this help message
EOF EOF
} }
FETCH_DEPENDENCIES=true FETCH_DEPENDENCIES=true
while getopts ":hp:i:msczf" arg; do while getopts ":hp:i:mscf" arg; do
case $arg in case $arg in
p) p)
BUILD_DIR="${OPTARG}" BUILD_DIR="${OPTARG}"
@ -49,9 +48,6 @@ while getopts ":hp:i:msczf" arg; do
c) c)
MVFST_USE_CCACHE=true MVFST_USE_CCACHE=true
;; ;;
z)
MVFST_ENABLE_CCP=true
;;
f) f)
FETCH_DEPENDENCIES=false FETCH_DEPENDENCIES=false
;; ;;
@ -88,12 +84,10 @@ mkdir -p "$MVFST_BUILD_DIR"
if [ -z "${INSTALL_PREFIX-}" ]; then if [ -z "${INSTALL_PREFIX-}" ]; then
FOLLY_INSTALL_DIR=$DEPS_DIR FOLLY_INSTALL_DIR=$DEPS_DIR
FIZZ_INSTALL_DIR=$DEPS_DIR FIZZ_INSTALL_DIR=$DEPS_DIR
LIBCCP_INSTALL_DIR=$DEPS_DIR
MVFST_INSTALL_DIR=$BWD MVFST_INSTALL_DIR=$BWD
else else
FOLLY_INSTALL_DIR=$INSTALL_PREFIX FOLLY_INSTALL_DIR=$INSTALL_PREFIX
FIZZ_INSTALL_DIR=$INSTALL_PREFIX FIZZ_INSTALL_DIR=$INSTALL_PREFIX
LIBCCP_INSTALL_DIR=$INSTALL_PREFIX
MVFST_INSTALL_DIR=$INSTALL_PREFIX MVFST_INSTALL_DIR=$INSTALL_PREFIX
fi fi
@ -354,32 +348,6 @@ function detect_platform() {
echo -e "${COLOR_GREEN}Detected platform: $Platform ${COLOR_OFF}" echo -e "${COLOR_GREEN}Detected platform: $Platform ${COLOR_OFF}"
} }
function setup_libccp() {
LIBCCP_DIR=$DEPS_DIR/libccp
LIBCCP_BUILD_DIR=$LIBCCP_DIR/build/
if [ ! -d "$LIBCCP_DIR" ] ; then
echo -e "${COLOR_GREEN}[ INFO ] Cloning libccp repo ${COLOR_OFF}"
git clone https://github.com/ccp-project/libccp "$LIBCCP_DIR"
fi
#synch_dependency_to_commit "$LIBCCP_DIR" "$MVFST_ROOT_DIR/build/deps/github_hashes/libccp-rev.txt"
echo -e "${COLOR_GREEN}Building libccp ${COLOR_OFF}"
mkdir -p "$LIBCCP_BUILD_DIR"
cd "$LIBCCP_BUILD_DIR" || exit
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo \
-DBUILD_TESTS=OFF \
-DCMAKE_PREFIX_PATH="$LIBCCP_INSTALL_DIR" \
-DCMAKE_INSTALL_PREFIX="$LIBCCP_INSTALL_DIR" \
-DCMAKE_CXX_FLAGS=\D__CPLUSPLUS__=1 \
${CMAKE_EXTRA_ARGS[@]+"${CMAKE_EXTRA_ARGS[@]}"} \
"$LIBCCP_DIR"
make -j "$nproc"
make install
echo -e "${COLOR_GREEN}libccp is installed ${COLOR_OFF}"
cd "$BWD" || exit
}
function setup_rust() { function setup_rust() {
if ! [ -x "$(command -v rustc)" ] || ! [ -x "$(command -v cargo)" ]; then if ! [ -x "$(command -v rustc)" ] || ! [ -x "$(command -v cargo)" ]; then
echo -e "${COLOR_RED}[ ERROR ] Rust not found (required for CCP support).${COLOR_OFF}\n" echo -e "${COLOR_RED}[ ERROR ] Rust not found (required for CCP support).${COLOR_OFF}\n"
@ -398,10 +366,6 @@ setup_googletest
setup_zstd setup_zstd
setup_folly setup_folly
setup_fizz setup_fizz
if [[ -n "${MVFST_ENABLE_CCP-}" ]]; then
setup_libccp
setup_rust
fi
# build mvfst: # build mvfst:
@ -413,9 +377,6 @@ mvfst_cmake_build_args=(
-DBUILD_TESTS=On \ -DBUILD_TESTS=On \
${CMAKE_EXTRA_ARGS[@]+"${CMAKE_EXTRA_ARGS[@]}"} \ ${CMAKE_EXTRA_ARGS[@]+"${CMAKE_EXTRA_ARGS[@]}"} \
) )
if [[ -n "${MVFST_ENABLE_CCP-}" ]]; then
mvfst_cmake_build_args+=(-DCCP_ENABLED=TRUE)
fi
cmake "${mvfst_cmake_build_args[@]}" ../.. cmake "${mvfst_cmake_build_args[@]}" ../..
make -j "$nproc" make -j "$nproc"

View File

@ -26,8 +26,6 @@ std::string_view congestionControlTypeToString(CongestionControlType type) {
return kCongestionControlCopa2Str; return kCongestionControlCopa2Str;
case CongestionControlType::NewReno: case CongestionControlType::NewReno:
return kCongestionControlNewRenoStr; return kCongestionControlNewRenoStr;
case CongestionControlType::CCP:
return kCongestionControlCcpStr;
case CongestionControlType::StaticCwnd: case CongestionControlType::StaticCwnd:
return kCongestionControlStaticCwndStr; return kCongestionControlStaticCwndStr;
case CongestionControlType::None: case CongestionControlType::None:
@ -53,8 +51,6 @@ std::optional<CongestionControlType> congestionControlStrToType(
return quic::CongestionControlType::Copa2; return quic::CongestionControlType::Copa2;
} else if (str == kCongestionControlNewRenoStr) { } else if (str == kCongestionControlNewRenoStr) {
return quic::CongestionControlType::NewReno; return quic::CongestionControlType::NewReno;
} else if (str == kCongestionControlCcpStr) {
return quic::CongestionControlType::CCP;
} else if (str == kCongestionControlStaticCwndStr) { } else if (str == kCongestionControlStaticCwndStr) {
return quic::CongestionControlType::StaticCwnd; return quic::CongestionControlType::StaticCwnd;
} else if (str == kCongestionControlNoneStr) { } else if (str == kCongestionControlNoneStr) {

View File

@ -392,7 +392,6 @@ constexpr std::string_view kCongestionControlBbrTestingStr = "bbr_testing";
constexpr std::string_view kCongestionControlCopaStr = "copa"; constexpr std::string_view kCongestionControlCopaStr = "copa";
constexpr std::string_view kCongestionControlCopa2Str = "copa2"; constexpr std::string_view kCongestionControlCopa2Str = "copa2";
constexpr std::string_view kCongestionControlNewRenoStr = "newreno"; constexpr std::string_view kCongestionControlNewRenoStr = "newreno";
constexpr std::string_view kCongestionControlCcpStr = "ccp";
constexpr std::string_view kCongestionControlStaticCwndStr = "staticcwnd"; constexpr std::string_view kCongestionControlStaticCwndStr = "staticcwnd";
constexpr std::string_view kCongestionControlNoneStr = "none"; constexpr std::string_view kCongestionControlNoneStr = "none";
@ -404,7 +403,6 @@ enum class CongestionControlType : uint8_t {
Copa2, Copa2,
BBR, BBR,
BBRTesting, BBRTesting,
CCP,
StaticCwnd, StaticCwnd,
None, None,
// NOTE: MAX should always be at the end // NOTE: MAX should always be at the end

View File

@ -16,7 +16,6 @@ add_library(
Copa2.cpp Copa2.cpp
NewReno.cpp NewReno.cpp
QuicCubic.cpp QuicCubic.cpp
QuicCCP.cpp
ServerCongestionControllerFactory.cpp ServerCongestionControllerFactory.cpp
SimulatedTBF.cpp SimulatedTBF.cpp
StaticCwndCongestionController.cpp StaticCwndCongestionController.cpp
@ -50,7 +49,6 @@ target_link_libraries(
mvfst_exception mvfst_exception
mvfst_state_machine mvfst_state_machine
mvfst_state_functions mvfst_state_functions
${LIBCCP_LIBRARY}
) )
file( file(
@ -71,4 +69,3 @@ install(
) )
add_subdirectory(test) add_subdirectory(test)
add_subdirectory(third_party/ccp)

View File

@ -33,10 +33,6 @@ DefaultCongestionControllerFactory::makeCongestionController(
case CongestionControlType::NewReno: case CongestionControlType::NewReno:
congestionController = std::make_unique<NewReno>(conn); congestionController = std::make_unique<NewReno>(conn);
break; break;
case CongestionControlType::CCP:
LOG(ERROR)
<< "Default CC Factory cannot make CCP. Falling back to cubic.";
FOLLY_FALLTHROUGH;
case CongestionControlType::Cubic: case CongestionControlType::Cubic:
congestionController = std::make_unique<Cubic>(conn); congestionController = std::make_unique<Cubic>(conn);
break; break;

View File

@ -1,341 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/congestion_control/QuicCCP.h>
#include <quic/congestion_control/CongestionControlFunctions.h>
#include <quic/server/state/ServerStateMachine.h>
#include <quic/state/StateData.h>
#ifdef CCP_ENABLED
#include <ccp/ccp.h>
#include <ccp/ccp_error.h>
#endif
// The ccpDatapath field is only defined when ccp is enabled.
// When ccp is not enabled, we can safely set this to null, because an
// instance of this class cannot be created (will be denied by
// ServerCongestionControllerFactory).
namespace quic {
#ifdef CCP_ENABLED
CCP::CCP(QuicConnectionStateBase& conn)
: conn_(static_cast<QuicServerConnectionState&>(conn)),
endOfRecovery_(folly::none),
inFallback_(false),
fallbackCC_(conn) {
cwndBytes_ = boundedCwnd(
conn.transportSettings.initCwndInMss * conn.udpSendPacketLen,
conn_.udpSendPacketLen,
conn_.transportSettings.maxCwndInMss,
conn_.transportSettings.minCwndInMss);
datapath_ = conn_.ccpDatapath;
struct ccp_datapath_info info = {
.init_cwnd = static_cast<u32>(conn.transportSettings.initCwndInMss),
.mss = static_cast<u32>(conn.udpSendPacketLen),
// the rest is not used at the moment
.src_ip = 0, // conn.peerAddress.getIPAddress().asV4().toLong()
.src_port = 0, // conn.peerAddress.getPort(),
.dst_ip = 0, // where is the destination addr?
.dst_port = 0, // where is the destination addr?
};
if (!datapath_) {
fallback();
return;
}
// Inform CCP about this new connection. The returned ccp_conn_ object
// contains per-connection state and must be passed to all other libccp
// functions regarding this connection. We pass a reference to ourself
// so that it can be pulled out later by CCPReader when it needs to look
// up the correct CCP instance for a given connection id.
ccp_conn_ = ccp_connection_start(datapath_, (void*)this, &info);
if (ccp_conn_ == nullptr) {
fallback();
LOG(ERROR) << "libccp::ccp_connection_start failed\n";
}
}
CCP::~CCP() {
if (ccp_conn_) {
// Inform CCP that this connection has ended and thus it can free related
// state.
ccp_connection_free(datapath_, ccp_conn_->index);
}
}
void CCP::onRemoveBytesFromInflight(uint64_t bytes) {
DCHECK_LE(bytes, conn_.lossState.inflightBytes);
conn_.lossState.inflightBytes -= bytes;
}
void CCP::onPacketSent(const OutstandingPacketWrapper& packet) {
if (std::numeric_limits<uint64_t>::max() - conn_.lossState.inflightBytes <
packet.metadata.encodedSize) {
throw QuicInternalException(
"CCP: inflightBytes overflow", LocalErrorCode::INFLIGHT_BYTES_OVERFLOW);
}
if (inFallback_) {
fallbackCC_.onPacketSent(packet);
} else {
// Fallback algorithm takes care of bytes acked since it shares the same
// conn_
addAndCheckOverflow(
conn_.lossState.inflightBytes, packet.metadata.encodedSize);
}
if (conn_.qLogger) {
conn_.qLogger->addCongestionMetricUpdate(
conn_.lossState.inflightBytes,
getCongestionWindow(),
kCongestionPacketSent);
}
}
void CCP::onAckEvent(const AckEvent& ack) {
DCHECK(ack.largestNewlyAckedPacket.has_value() && !ack.ackedPackets.empty());
// Fallback algorithm takes care of bytes acked since it shares the same conn_
if (!inFallback_) {
onRemoveBytesFromInflight(ack.ackedBytes);
}
// Add latest state to this batch of udpates. See comment regarding
// ccp_primitives struct
struct ccp_primitives* mmt = &ccp_conn_->prims;
mmt->bytes_acked += ack.ackedBytes;
mmt->packets_acked += ack.ackedPackets.size();
for (const auto& packet : ack.ackedPackets) {
if (packet.encodedSize == 0) {
continue;
}
if (packet.lastAckedPacketInfo) {
sendRate_ = Bandwidth(
packet.totalBytesSentThen -
packet.lastAckedPacketInfo->totalBytesSent,
std::chrono::duration_cast<std::chrono::microseconds>(
packet.sentTime - packet.lastAckedPacketInfo->sentTime));
ackRate_ = Bandwidth(
conn_.lossState.totalBytesAcked -
packet.lastAckedPacketInfo->totalBytesAcked,
std::chrono::duration_cast<std::chrono::microseconds>(
ack.ackTime - packet.lastAckedPacketInfo->ackTime));
} else if (ack.ackTime > packet.sentTime) {
sendRate_ = Bandwidth(
packet.encodedSize,
std::chrono::duration_cast<std::chrono::microseconds>(
ack.ackTime - packet.sentTime));
}
}
}
void CCP::onPacketAckOrLoss(
const AckEvent* FOLLY_NULLABLE ackEvent,
const LossEvent* FOLLY_NULLABLE lossEvent) {
// If we are in fallback mode, forward the call to the fallback algorithm.
if (inFallback_) {
fallbackCC_.onPacketAckOrLoss(ackEvent, lossEvent);
}
if (!ccp_conn_) {
return;
}
// If we never connected to ccp in the first place, nothing else to do
// regardless
if (!ccp_conn_) {
return;
}
/**
* Even if we are in fallback, we finish the rest of this function so that we
* can keep the ccp primitives up to date in case connection with CCP is
* restored.
*/
if (lossEvent) {
onLossEvent(*lossEvent);
if (conn_.pacer) {
conn_.pacer->onPacketsLoss();
}
}
if (ackEvent && ackEvent->largestNewlyAckedPacket.hasValue()) {
onAckEvent(*ackEvent);
}
/**
* The ccp_primitives struct contains the list of all statistics ccp wants to
* know about. These are kept as up to date as possible, and fed to ccp_invoke
* every time there's an ack or loss. Whenever ccp_invoke is called, libccp
* internally decides whether or not to batch and send the updates. As the
* caller, this is totally abstracted from us.
*/
struct ccp_primitives* mmt = &ccp_conn_->prims;
mmt->snd_cwnd = cwndBytes_;
mmt->rtt_sample_us = conn_.lossState.srtt.count();
mmt->bytes_in_flight = conn_.lossState.inflightBytes;
mmt->rate_outgoing = sendRate_.normalize();
mmt->rate_incoming = ackRate_.normalize();
mmt->bytes_misordered = 0; // used for TCP SACK
mmt->packets_misordered = 0; // used for TCP SACK
/**
* This is the heart of ccp on the datapath side. It takes the ccp_primitives
* and runs them through the current datapath program. It handles sending
* these in an update to ccp if necessary. It also keeps track of the fallback
* timer internally. If the timer has expired, all calls to ccp_invoke will
* return LIBCCP_FALLBACK_TIMED_OUT until connection with ccp is restored. To
* get a log when this happens (once per datapath rather than per connectoion)
* enable low level logging in libccp directly.
*/
int ret = ccp_invoke(ccp_conn_);
// If this is the first time we've received LIBCCP_FALLBACK_TIMED_OUT, it
// means we need to switch modes, otherwise we're already in fallback and can
// ignore it.
if (ret == LIBCCP_FALLBACK_TIMED_OUT && !inFallback_) {
fallback();
}
// If we're in fallback mode and libccp returned ok, then we can switch out of
// fallback mode.
if (ret == 0 && inFallback_) {
restoreAfterFallback();
}
// If libccp returned an error code other than FALLBACK_TIMED_OUT, it
// indicates an actual problem that we should raise an alert about.
if (ret && ret != LIBCCP_FALLBACK_TIMED_OUT) {
LOG(ERROR) << "libccp::ccp_invoke failed ret=" << ret;
}
// These measurements represent a counter since the last call to ccp_invoke,
// so we need to reset them each time. The other measurements are just the
// most up-to-date view of the statistics, so they can simply be overwritten
// each time.
mmt->bytes_acked = 0;
mmt->packets_acked = 0;
mmt->lost_pkts_sample = 0;
}
void CCP::fallback() {
inFallback_ = true;
// This just starts the fallback alg where we left off so it doesn't need to
// restart all connections at init cwnd again.
fallbackCC_.handoff(cwndBytes_);
}
void CCP::restoreAfterFallback() {
inFallback_ = false;
// Pick up the cwnd where the fallback alg left off.
setCongestionWindow(fallbackCC_.getCongestionWindow());
}
void CCP::onLossEvent(const LossEvent& loss) {
DCHECK(
loss.largestLostPacketNum.hasValue() &&
loss.largestLostSentTime.hasValue());
// Each "lost_pkt" is counted as a loss event in CCP, which will often warrant
// a different response. This logic helps distinguish between multiple lost
// packets within the same event and multiple distinct loss events.
if (!endOfRecovery_ || *endOfRecovery_ < *loss.largestLostSentTime) {
endOfRecovery_ = Clock::now();
ccp_conn_->prims.lost_pkts_sample += loss.lostPackets;
}
// Fallback algorithm takes care of bytes acked since it shares the same conn_
if (!inFallback_) {
onRemoveBytesFromInflight(loss.lostBytes);
}
}
void CCP::setPacingRate(uint64_t rate) noexcept {
pacingRate_ = rate;
if (conn_.pacer) {
conn_.pacer->setPacingRate(rate);
} else {
LOG(ERROR) << "setPacingRate called but pacer is undefined!";
}
}
uint64_t CCP::getWritableBytes() const noexcept {
uint64_t cwndBytes = getCongestionWindow();
return cwndBytes > conn_.lossState.inflightBytes
? cwndBytes - conn_.lossState.inflightBytes
: 0;
}
uint64_t CCP::getCongestionWindow() const noexcept {
uint64_t cwnd = cwndBytes_;
if (inFallback_) {
cwnd = fallbackCC_.getCongestionWindow();
}
return cwnd;
}
void CCP::setCongestionWindow(uint64_t cwnd) noexcept {
cwndBytes_ = cwnd;
}
uint64_t CCP::getBytesInFlight() const noexcept {
return conn_.lossState.inflightBytes;
}
#else
CCP::CCP(QuicConnectionStateBase& conn)
: conn_(static_cast<QuicServerConnectionState&>(conn)),
endOfRecovery_(folly::none),
inFallback_(false),
fallbackCC_(conn) {}
void CCP::onRemoveBytesFromInflight(uint64_t) {}
void CCP::onPacketSent(const OutstandingPacketWrapper&) {}
void CCP::onPacketAckOrLoss(
const AckEvent* FOLLY_NULLABLE,
const LossEvent* FOLLY_NULLABLE) {}
uint64_t CCP::getWritableBytes() const noexcept {
return 0;
}
uint64_t CCP::getCongestionWindow() const noexcept {
return 0;
}
void CCP::setCongestionWindow(uint64_t) noexcept {}
uint64_t CCP::getBytesInFlight() const noexcept {
return 0;
}
void CCP::setPacingRate(uint64_t) noexcept {}
void CCP::onLossEvent(const LossEvent&) {}
void CCP::onAckEvent(const AckEvent&) {}
void CCP::fallback() {}
void CCP::restoreAfterFallback() {}
CCP::~CCP() = default;
#endif
CongestionControlType CCP::type() const noexcept {
return CongestionControlType::CCP;
}
void CCP::setAppIdle(bool, TimePoint) noexcept {
/* unsupported */
}
void CCP::setAppLimited() {
/* unsupported */
}
bool CCP::isAppLimited() const noexcept {
/* unsupported */
return false;
}
} // namespace quic

View File

@ -1,125 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <quic/QuicException.h>
#include <quic/congestion_control/Bandwidth.h>
#include <quic/congestion_control/CongestionController.h>
#include <quic/congestion_control/QuicCubic.h>
#include <quic/server/state/ServerStateMachine.h>
#include <quic/state/AckEvent.h>
#include <quic/state/StateData.h>
#ifdef CCP_ENABLED
#include <ccp/ccp.h>
#endif
#include <limits>
namespace quic {
/**
* The class defines CCP as a congestion control "algorithm", but it really acts
* as a proxy layer between the datapath (mvfst) and CCP. Each instance of this
* class corresponds to a separate connection that is using CCP for congestion
* control.
*
* From the perspective of this class, the behavior is very simple:
* - On each ack or loss, it updates some statistics about the connection (rtt
* etc.)
* - Once in a while, after updating it will batch the last few updates and send
* them to CCP (via IPC).
* - Asynchronously, at any time CCP may send a message (via IPC) telling this
* connection to use a new cwnd or rate. CCPReader receives these messages,
* looks up the correct instance of QuicCCP for that connection, and calls
* setCongestionWindow or setPacingRate. This class does not contain any logic
* for updating the cwnd (or rate) directly, they only change when directed by
* CCP.
*
* The low-level functionality, such as batching updates, serializing them,
* sending IPC, etc. is handled by libccp, which is a helper library provided by
* the CCP authors.
*
* If, for whatever reason, mvfst has not received any responses from CCP for a
* while, we go into "fallback" mode and use cubic instead, by simply proxying
* the acks, losses, and cwnd updates to an internal instance of QuicCubic. If
* connection is re-established with CCP, we leave fallback mode and pick up the
* cwnd where cubic left off.
*
*/
class CCP : public CongestionController {
public:
explicit CCP(QuicConnectionStateBase& conn);
~CCP() override;
void onRemoveBytesFromInflight(uint64_t) override;
void onPacketSent(const OutstandingPacketWrapper& packet) override;
void onPacketAckOrLoss(
const AckEvent* FOLLY_NULLABLE,
const LossEvent* FOLLY_NULLABLE) override;
void onPacketAckOrLoss(
folly::Optional<AckEvent> ack,
folly::Optional<LossEvent> loss) {
onPacketAckOrLoss(ack.get_pointer(), loss.get_pointer());
}
FOLLY_NODISCARD uint64_t getWritableBytes() const noexcept override;
FOLLY_NODISCARD uint64_t getCongestionWindow() const noexcept override;
// Called (indirectly) by CCP when it wants to update the cwnd for this
// connection.
void setCongestionWindow(uint64_t cwnd) noexcept;
void setAppIdle(bool, TimePoint) noexcept override;
void setAppLimited() override;
FOLLY_NODISCARD CongestionControlType type() const noexcept override;
FOLLY_NODISCARD uint64_t getBytesInFlight() const noexcept;
// Called indirectly by CCP when it wants to update the pacing rate for this
// connection.
void setPacingRate(uint64_t rate) noexcept;
FOLLY_NODISCARD bool isAppLimited() const noexcept override;
void getStats(CongestionControllerStats& /*stats*/) const override {}
private:
void onLossEvent(const LossEvent&);
void onAckEvent(const AckEvent&);
// Fallback to in-datapath congestion control impl (eg. because ccp not
// responding)
void fallback();
// Go back to using CCP after a period of fallback
void restoreAfterFallback();
private:
QuicServerConnectionState& conn_;
// The current cwnd for this connection when we are not in fallback mode.
// When we are in fallback mode, we instead use
// fallbackCC_->getCongestionWindow().
uint64_t cwndBytes_;
// The current pacing rate for this connection when we are not in fallback
// mode. When we are in fallback mode, the pacing rate is set by fallbackCC_.
uint64_t pacingRate_{0};
// The per-connection state needed by libccp, initialized by calling
// libccp::ccp_connection_start in the constructor.
struct ccp_connection* ccp_conn_{nullptr};
// The global (per QuicServerWorker) state needed by libccp, retrieved from
// the connection's corresponding QuicServerConnectionState.
struct ccp_datapath* datapath_{nullptr};
// Used to help ensure we don't count the same loss event multiple times.
folly::Optional<TimePoint> endOfRecovery_;
// Current estimate of the send and ack rate, two of the basic statistics sent
// to CCP.
Bandwidth sendRate_, ackRate_;
// Whether or not we are in fallback mode.
bool inFallback_;
// An instance of QuicCubic to use during fallback mode.
Cubic fallbackCC_;
};
} // namespace quic

View File

@ -14,7 +14,6 @@
#include <quic/congestion_control/Copa.h> #include <quic/congestion_control/Copa.h>
#include <quic/congestion_control/Copa2.h> #include <quic/congestion_control/Copa2.h>
#include <quic/congestion_control/NewReno.h> #include <quic/congestion_control/NewReno.h>
#include <quic/congestion_control/QuicCCP.h>
#include <quic/congestion_control/QuicCubic.h> #include <quic/congestion_control/QuicCubic.h>
#include <quic/congestion_control/StaticCwndCongestionController.h> #include <quic/congestion_control/StaticCwndCongestionController.h>
@ -35,15 +34,6 @@ ServerCongestionControllerFactory::makeCongestionController(
case CongestionControlType::NewReno: case CongestionControlType::NewReno:
congestionController = std::make_unique<NewReno>(conn); congestionController = std::make_unique<NewReno>(conn);
break; break;
case CongestionControlType::CCP:
#ifdef CCP_ENABLED
congestionController = std::make_unique<CCP>(conn);
break;
#else
LOG(ERROR)
<< "Server CC Factory cannot make CCP (unless recompiled with -DCCP_ENABLED). Falling back to cubic.";
FOLLY_FALLTHROUGH;
#endif
case CongestionControlType::Cubic: case CongestionControlType::Cubic:
congestionController = std::make_unique<Cubic>(conn); congestionController = std::make_unique<Cubic>(conn);
break; break;

View File

@ -1,159 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <folly/portability/GTest.h>
#include <chrono>
#include <thread>
#define MSS 1232
namespace quic::test {
/*
class QuicCCPTest : public Test {
};
*/
#ifdef CCP_ENABLED
TEST(CCPTest, TestFallback) {
folly::ScopedEventBaseThread evbThread_;
QuicCcpThreadLauncher launcher;
EXPECT_FALSE(launcher.hasLaunched());
evbThread_.getEventBase()->waitUntilRunning();
std::unique_ptr<CCPReader> reader = std::make_unique<CCPReader>();
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
reader->try_initialize(evbThread_.getEventBase(), 0, 0, 0);
auto ret = reader->connect();
EXPECT_LT(ret, 0);
reader->start();
});
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::unique_ptr<CCP> ccp;
QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build());
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
conn.ccpDatapath = reader->getDatapath();
conn.lossState.srtt = 50us;
ccp = std::make_unique<CCP>(conn);
EXPECT_EQ(
ccp->getCongestionWindow(),
conn.transportSettings.initCwndInMss * conn.udpSendPacketLen);
EXPECT_EQ(conn.lossState.inflightBytes, 0);
});
std::this_thread::sleep_for(std::chrono::milliseconds(500));
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
auto packet1 = makeTestingWritePacket(10, MSS, MSS);
ccp->onPacketSent(packet1);
EXPECT_EQ(conn.lossState.inflightBytes, MSS);
auto packet2 = makeTestingWritePacket(20, MSS, MSS);
ccp->onPacketSent(packet2);
EXPECT_EQ(conn.lossState.inflightBytes, MSS * 2);
auto packet3 = makeTestingWritePacket(30, MSS, MSS);
ccp->onPacketSent(packet3);
EXPECT_EQ(conn.lossState.inflightBytes, MSS * 3);
ccp->onPacketAckOrLoss(
makeAck(10, MSS, Clock::now(), packet1.metadata.time), folly::none);
EXPECT_EQ(conn.lossState.inflightBytes, MSS * 2);
ccp->onPacketAckOrLoss(
makeAck(20, MSS, Clock::now(), packet2.metadata.time), folly::none);
EXPECT_EQ(conn.lossState.inflightBytes, MSS * 1);
ccp->onPacketAckOrLoss(
makeAck(30, MSS, Clock::now(), packet3.metadata.time), folly::none);
EXPECT_EQ(conn.lossState.inflightBytes, 0);
});
std::this_thread::sleep_for(std::chrono::milliseconds(500));
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
ccp.reset();
auto manual = reader.release();
delete manual;
});
}
#define xstr(s) str(s)
#define str(s) #s
#define FIXED_CWND_TEST 100
#define INIT_CWND 10
TEST(CCPTest, TestSimple) {
folly::ScopedEventBaseThread evbThread_;
QuicCcpThreadLauncher launcher;
evbThread_.getEventBase()->waitUntilRunning();
std::string ccpConfig =
std::string("constant --cwnd=") + std::string(xstr(FIXED_CWND_TEST));
launcher.start(ccpConfig);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_TRUE(launcher.hasLaunched());
std::unique_ptr<CCPReader> reader = std::make_unique<CCPReader>();
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
reader->try_initialize(
evbThread_.getEventBase(), launcher.getCcpId(), 0, 0);
auto ret = reader->connect();
EXPECT_GE(ret, 0);
reader->start();
});
std::unique_ptr<CCP> ccp;
QuicServerConnectionState conn(
FizzServerQuicHandshakeContext::Builder().build());
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
conn.ccpDatapath = reader->getDatapath();
conn.lossState.srtt = 50us;
ccp = std::make_unique<CCP>(conn);
EXPECT_EQ(ccp->getCongestionWindow(), INIT_CWND * conn.udpSendPacketLen);
});
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
auto packet1 = makeTestingWritePacket(10, MSS, MSS);
ccp->onPacketSent(packet1);
auto packet2 = makeTestingWritePacket(20, MSS, MSS);
ccp->onPacketSent(packet2);
auto packet3 = makeTestingWritePacket(30, MSS, MSS);
ccp->onPacketSent(packet3);
EXPECT_EQ(ccp->getCongestionWindow(), INIT_CWND * conn.udpSendPacketLen);
EXPECT_EQ(conn.lossState.inflightBytes, MSS * 3);
ccp->onPacketAckOrLoss(
makeAck(10, MSS, Clock::now(), packet1.metadata.time), folly::none);
EXPECT_EQ(
ccp->getCongestionWindow(), FIXED_CWND_TEST * conn.udpSendPacketLen);
ccp->onPacketAckOrLoss(
makeAck(20, MSS, Clock::now(), packet2.metadata.time), folly::none);
ccp->onPacketAckOrLoss(
makeAck(30, MSS, Clock::now(), packet3.metadata.time), folly::none);
EXPECT_EQ(conn.lossState.inflightBytes, 0);
EXPECT_EQ(
ccp->getCongestionWindow(), FIXED_CWND_TEST * conn.udpSendPacketLen);
});
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
evbThread_.getEventBase()->runInEventBaseThreadAndWait([&] {
ccp.reset();
auto manual = reader.release();
delete manual;
});
launcher.stop();
}
#endif
} // namespace quic::test

View File

@ -1 +0,0 @@
Cargo.lock

View File

@ -1,47 +0,0 @@
# The core CCP library is written in rust, thus we need to use cargo to build it.
# This file externally calls out to cargo and then sets some variables to tell cmake where to find the generated library.
if(DEFINED CCP_ENABLED)
# choose cargo build mode based on cmake build mode
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
# always need to build with nightly because portus (ccp rust library) requires it (at the moment)
set(CARGO_CMD cargo +nightly build --lib)
set(TARGET_DIR "debug")
else ()
# always need to build with nightly because portus (ccp rust library) requires it (at the moment)
set(CARGO_CMD cargo +nightly build --lib --release)
set(TARGET_DIR "release")
endif ()
# cargo will place the library here
set(CARGO_TARGET_LOC "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_DIR}/libstartccp.a")
# but cmake likes the library to be here (i.e. no ${TARGET_DIR})
set(CMAKE_TARGET_LOC "${CMAKE_CURRENT_BINARY_DIR}/libstartccp.a")
# since cmake isn't building ccp directly (cargo is), it doesn't know which source files it depends on
# (and thus when to rebuild it, so we need to specify it manually below with "DEPENDS"...)
# the set of source files is any rust or Cargo.toml file in quic/congestion_control/third_party
file(GLOB_RECURSE ALG_SRC ${CMAKE_CURRENT_SOURCE_DIR}/../ *.rs Cargo.toml)
message(STATUS "found alg files: ${ALG_SRC}")
# run cargo to actually build the library,
# then copy the library to where cmake wants it to be
add_custom_command(
OUTPUT ${CMAKE_TARGET_LOC}
COMMAND CARGO_TARGET_DIR=${CMAKE_CURRENT_BINARY_DIR} ${CARGO_CMD}
COMMAND cp ${CARGO_TARGET_LOC} ${CMAKE_TARGET_LOC}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${ALG_SRC}
COMMENT "Building CCP algorithms..."
VERBATIM
)
# we make a target that depends on the output of cargo (as opposed to generating it here directly) so that cmake
# doesn't re-run cargo every time
add_custom_target(libstartccp DEPENDS ${CMAKE_TARGET_LOC})
# each cmakelists.txt has its own scope, so we can't access CMAKE_TARGET_LOC in the scope where we need to use it as a dependency
# so instead, we set it as a property of this target, which we can lookup later using get_target_properties()
set_target_properties(libstartccp PROPERTIES LOCATION ${CMAKE_TARGET_LOC})
endif()

View File

@ -1,18 +0,0 @@
[package]
name = "startccp"
version = "0.1.0"
edition = "2021"
[lib]
name = "startccp"
crate-type = ["staticlib", "cdylib"]
[dependencies]
libc = "0.2"
clap = "2.32"
portus = { git = "https://github.com/ccp-project/portus", rev = "ef2ddb674785e1f0c0869fe648896ac866659ab3" }
portus_export = { git = "https://github.com/ccp-project/portus", rev = "ef2ddb674785e1f0c0869fe648896ac866659ab3" }
[profile.release]
opt-level = 3
lto = true

View File

@ -1,91 +0,0 @@
extern crate ccp_const;
extern crate clap;
extern crate portus;
extern crate slog;
use ccp_const::CcpConstAlg;
use clap::Arg;
use slog::warn;
const MBPS_TO_BPS: u32 = 1_000_000;
const BITS_TO_BYTES: u32 = 8;
const PKTS_TO_BYTES: u32 = 1500;
fn main() {
let log = portus::algs::make_logger();
let (cfg, ipc) = || -> Result<(CcpConstAlg, String), String> {
let matches = clap::App::new("CCP Constant Cwnd/Rate")
.version("0.1.0")
.author("<ccp@csail.mit.edu>")
.about("Congestion control algorithm which sets a constant rate or cwnd")
.arg(
Arg::with_name("ipc")
.long("ipc")
.help("Sets the type of ipc to use: (netlink|unix)")
.takes_value(true)
.required(true)
.validator(portus::algs::ipc_valid),
)
.arg(
Arg::with_name("cwnd")
.long("cwnd")
.takes_value(true)
.help("Sets the congestion window, in packets."),
)
.arg(
Arg::with_name("rate")
.long("rate")
.takes_value(true)
.help("Sets the rate to use, in Mbps, must also specify cwnd_cap"),
)
.arg(
Arg::with_name("cwnd_cap")
.long("cwnd_cap")
.takes_value(true)
.help("The max cwnd, in packets, *only* when setting a rate"),
)
.group(
clap::ArgGroup::with_name("to_set")
.args(&["cwnd", "rate"])
.required(true),
)
.get_matches();
if !matches.is_present("to_set") {
return Err(String::from("must supply either cwnd or rate"));
}
let const_param = if matches.is_present("rate") {
let rate = u32::from_str_radix(matches.value_of("rate").unwrap(), 10)
.map_err(|e| format!("{:?}", e))?;
if !matches.is_present("cwnd_cap") {
return Err(String::from("when using rate, must also specify cwnd_cap"));
}
let cwnd_cap = u32::from_str_radix(matches.value_of("cwnd_cap").unwrap(), 10)
.map_err(|e| format!("{:?}", e))?;
let rate = rate * MBPS_TO_BPS / BITS_TO_BYTES;
let cwnd_cap = cwnd_cap * PKTS_TO_BYTES;
Ok(ccp_const::Constant::Rate { rate, cwnd_cap })
} else if matches.is_present("cwnd") {
let cwnd = u32::from_str_radix(matches.value_of("cwnd").unwrap(), 10)
.map_err(|e| format!("{:?}", e))?;
let cwnd = cwnd * PKTS_TO_BYTES;
Ok(ccp_const::Constant::Cwnd(cwnd))
} else {
Err(String::from("must supply either cwnd or rate"))
}?;
Ok((
CcpConstAlg {
logger: Some(log.clone()),
const_param,
},
String::from(matches.value_of("ipc").unwrap()),
))
}()
.map_err(|e| warn!(log, "bad argument"; "err" => ?e))
.unwrap();
portus::start!(ipc.as_str(), Some(log), cfg).unwrap()
}

View File

@ -1,201 +0,0 @@
extern crate clap;
extern crate portus;
extern crate portus_export;
extern crate slog;
use portus::ipc::Ipc;
use portus::lang::Scope;
use portus::{CongAlg, CongAlgBuilder, Datapath, DatapathInfo, DatapathTrait, Report};
use slog::{debug, warn};
use std::collections::HashMap;
#[derive(Clone, Copy)]
pub enum Constant {
Cwnd(u32),
Rate { rate: u32, cwnd_cap: u32 },
}
#[portus_export::register_ccp_alg]
pub struct CcpConstAlg {
pub logger: Option<slog::Logger>,
pub const_param: Constant,
}
pub struct CcpConstFlow<T: Ipc> {
logger: Option<slog::Logger>,
sc: Scope,
control_channel: Datapath<T>,
const_param: Constant,
mss: u32,
}
impl<I: Ipc> CongAlg<I> for CcpConstAlg {
type Flow = CcpConstFlow<I>;
fn name() -> &'static str {
"constant"
}
fn datapath_programs(&self) -> HashMap<&'static str, String> {
let mut h = HashMap::default();
h.insert(
"constant",
"
(def (Report
(volatile rtt 0)
(volatile rin 0)
(volatile rout 0)
(volatile loss 0)
))
(when true
(:= Report.rtt Flow.rtt_sample_us)
(:= Report.rin Flow.rate_outgoing)
(:= Report.rout Flow.rate_incoming)
(:= Report.loss (+ Report.loss Ack.lost_pkts_sample))
(fallthrough)
)
(when (> Micros 1000000)
(report)
(:= Micros 0)
)"
.to_owned(),
);
h
}
fn new_flow(&self, mut control: Datapath<I>, info: DatapathInfo) -> Self::Flow {
let params = match self.const_param {
Constant::Cwnd(c) => vec![("Cwnd", c * info.mss)],
Constant::Rate {
rate: r,
cwnd_cap: c,
} => vec![("Cwnd", c * info.mss), ("Rate", r)],
};
let sc = control.set_program("constant", Some(&params)).unwrap();
CcpConstFlow {
logger: self.logger.clone(),
sc,
control_channel: control,
const_param: self.const_param,
mss: info.mss,
}
}
}
use clap::Arg;
const MBPS_TO_BPS: u32 = 1_000_000;
const BITS_TO_BYTES: u32 = 8;
//const PKTS_TO_BYTES: u32 = 1500;
impl<'a, 'b> CongAlgBuilder<'a, 'b> for CcpConstAlg {
fn args() -> clap::App<'a, 'b> {
clap::App::new("CCP Constant Cwnd/Rate")
.version("0.2.0")
.author("<ccp@csail.mit.edu>")
.about("Congestion control algorithm which sets a constant rate or cwnd")
.arg(
Arg::with_name("cwnd")
.long("cwnd")
.takes_value(true)
.help("Sets the congestion window, in packets."),
)
.arg(
Arg::with_name("rate")
.long("rate")
.takes_value(true)
.help("Sets the rate to use, in Mbps, must also specify cwnd_cap"),
)
.arg(
Arg::with_name("cwnd_cap")
.long("cwnd_cap")
.takes_value(true)
.help("The max cwnd, in MSS packets, *only* when setting a rate"),
)
.group(
clap::ArgGroup::with_name("to_set")
.args(&["cwnd", "rate"])
.required(true),
)
}
fn with_arg_matches(
args: &clap::ArgMatches,
logger: Option<slog::Logger>,
) -> Result<Self, portus::Error> {
if !args.is_present("to_set") {
return Err(portus::Error(String::from(
"must supply either cwnd or rate",
)));
}
let const_param = if args.is_present("rate") {
let rate = u32::from_str_radix(args.value_of("rate").unwrap(), 10)
.map_err(|e| portus::Error(e.to_string()))?;
if !args.is_present("cwnd_cap") {
return Err(portus::Error(String::from(
"when using rate, must also specify cwnd_cap",
)));
}
let cwnd_cap = u32::from_str_radix(args.value_of("cwnd_cap").unwrap(), 10)
.map_err(|e| portus::Error(e.to_string()))?;
let rate = rate * MBPS_TO_BPS / BITS_TO_BYTES;
let cwnd_cap = cwnd_cap;
Ok(Constant::Rate { rate, cwnd_cap })
} else if args.is_present("cwnd") {
let cwnd = u32::from_str_radix(args.value_of("cwnd").unwrap(), 10)
.map_err(|e| portus::Error(e.to_string()))?;
let cwnd = cwnd;
Ok(Constant::Cwnd(cwnd))
} else {
Err(portus::Error(String::from(
"must supply either cwnd or rate",
)))
}?;
Ok(Self {
logger,
const_param,
})
}
}
impl<T: Ipc> portus::Flow for CcpConstFlow<T> {
fn on_report(&mut self, sock_id: u32, m: Report) {
let rtt = m
.get_field("Report.rtt", &self.sc)
.expect("expected rtt in report") as u32;
let rin = m
.get_field("Report.rin", &self.sc)
.expect("expected rin in report") as u32;
let rout = m
.get_field("Report.rout", &self.sc)
.expect("expected rout in report") as u32;
let loss = m
.get_field("Report.loss", &self.sc)
.expect("expected loss in report") as u32;
self.logger.as_ref().map(|log| {
debug!(log, "report";
"sid" => sock_id,
"rtt(us)" => rtt,
"rin(Bps)" => rin,
"rout(Bps)" => rout,
"loss(pkts)" => loss,
);
});
let update = match self.const_param {
Constant::Cwnd(c) => vec![("Cwnd", c * self.mss)],
Constant::Rate {
rate: r,
cwnd_cap: c,
} => vec![("Cwnd", c * self.mss), ("Rate", r)],
};
if let Err(e) = self.control_channel.update_field(&self.sc, &update) {
self.logger.as_ref().map(|log| {
warn!(log, "rate update error"; "err" => ?e,);
});
}
}
}

View File

@ -1,24 +0,0 @@
extern crate clap;
extern crate time;
#[macro_use]
extern crate slog;
extern crate generic_cong_avoid;
extern crate portus;
use generic_cong_avoid::cubic::Cubic;
fn main() {
let log = portus::algs::make_logger();
let (alg, ipc) = generic_cong_avoid::make_args("CCP Cubic", log.clone())
.map_err(|e| warn!(log, "bad argument"; "err" => ?e))
.unwrap();
info!(log, "initializing";
"reports" => ?alg.report_option,
"slow_start_mode" => ?alg.ss,
);
generic_cong_avoid::start::<Cubic>(ipc.as_str(), log, alg);
}

View File

@ -1,24 +0,0 @@
extern crate clap;
extern crate time;
#[macro_use]
extern crate slog;
extern crate generic_cong_avoid;
extern crate portus;
use generic_cong_avoid::reno::Reno;
fn main() {
let log = portus::algs::make_logger();
let (alg, ipc) = generic_cong_avoid::make_args("CCP Reno", log.clone())
.map_err(|e| warn!(log, "bad argument"; "err" => ?e))
.unwrap();
info!(log, "initializing";
"reports" => ?alg.report_option,
"slow_start_mode" => ?alg.ss,
);
generic_cong_avoid::start::<Reno>(ipc.as_str(), log, alg);
}

View File

@ -1,91 +0,0 @@
use clap::Arg;
use {
Alg, GenericCongAvoidAlg, GenericCongAvoidConfigReport, GenericCongAvoidConfigSS,
DEFAULT_SS_THRESH,
};
pub fn make_args<A: GenericCongAvoidAlg>(
name: &str,
logger: impl Into<Option<slog::Logger>>,
) -> Result<(Alg<A>, String), std::num::ParseIntError> {
let ss_thresh_default = format!("{}", DEFAULT_SS_THRESH);
let matches = clap::App::new(name)
.version("0.2.0")
.author("Akshay Narayan <akshayn@mit.edu>")
.about("CCP implementation of a congestion avoidance algorithm")
.arg(Arg::with_name("ipc")
.long("ipc")
.help("Sets the type of ipc to use: (netlink|unix)")
.default_value("unix")
.validator(portus::algs::ipc_valid))
.arg(Arg::with_name("init_cwnd")
.long("init_cwnd")
.help("Sets the initial congestion window, in bytes. Setting 0 will use datapath default.")
.default_value("0"))
.arg(Arg::with_name("ss_thresh")
.long("ss_thresh")
.help("Sets the slow start threshold, in bytes")
.default_value(&ss_thresh_default))
.arg(Arg::with_name("ss_in_fold")
.long("ss_in_fold")
.help("Implement slow start in the datapath"))
.arg(Arg::with_name("report_per_ack")
.long("per_ack")
.help("Specifies that the datapath should send a measurement upon every ACK"))
.arg(Arg::with_name("report_per_interval")
.long("report_interval_ms")
.short("i")
.takes_value(true))
.group(clap::ArgGroup::with_name("interval")
.args(&["report_per_ack", "report_per_interval"])
.required(false))
.arg(Arg::with_name("compensate_update")
.long("compensate_update")
.help("Scale the congestion window update during slow start to compensate for reporting delay"))
.arg(Arg::with_name("deficit_timeout")
.long("deficit_timeout")
.default_value("0")
.help("Number of RTTs to wait after a loss event to allow further CWND reductions. \
Default 0 means CWND deficit counting is enforced strictly with no timeout."))
.args(&A::args())
.get_matches();
let ipc = String::from(matches.value_of("ipc").unwrap());
Ok((
Alg {
ss_thresh: u32::from_str_radix(matches.value_of("ss_thresh").unwrap(), 10)?,
init_cwnd: u32::from_str_radix(matches.value_of("init_cwnd").unwrap(), 10)?,
report_option: if matches.is_present("report_per_ack") {
GenericCongAvoidConfigReport::Ack
} else if matches.is_present("report_per_interval") {
GenericCongAvoidConfigReport::Interval(time::Duration::milliseconds(
matches
.value_of("report_per_interval")
.unwrap()
.parse()
.unwrap(),
))
} else {
GenericCongAvoidConfigReport::Rtt
},
ss: if matches.is_present("ss_in_fold") {
GenericCongAvoidConfigSS::Datapath
} else {
GenericCongAvoidConfigSS::Ccp
},
use_compensation: matches.is_present("compensate_update"),
deficit_timeout: u32::from_str_radix(matches.value_of("deficit_timeout").unwrap(), 10)?,
logger: logger.into(),
alg: A::with_args(&matches),
},
ipc,
))
}
pub fn start<A: GenericCongAvoidAlg>(ipc: &str, log: slog::Logger, alg: Alg<A>)
where
A: 'static,
{
portus::start!(ipc, Some(log), alg).unwrap()
}

View File

@ -1,167 +0,0 @@
extern crate portus_export;
extern crate slog;
extern crate time;
pub use crate::GenericCongAvoidAlg;
pub use crate::GenericCongAvoidFlow;
pub use crate::GenericCongAvoidMeasurements;
#[derive(Default)]
pub struct Cubic {
pkt_size: u32,
init_cwnd: u32,
cwnd: f64,
cwnd_cnt: f64,
tcp_friendliness: bool,
beta: f64,
fast_convergence: bool,
c: f64,
wlast_max: f64,
epoch_start: f64,
origin_point: f64,
d_min: f64,
wtcp: f64,
k: f64,
ack_cnt: f64,
cnt: f64,
cubic_rtt: f64,
}
impl Cubic {
fn cubic_update(&mut self) {
self.ack_cnt += 1.0;
if self.epoch_start <= 0.0 {
self.epoch_start =
(time::get_time().sec as f64) + f64::from(time::get_time().nsec) / 1_000_000_000.0;
if self.cwnd < self.wlast_max {
let temp = (self.wlast_max - self.cwnd) / self.c;
self.k = (temp.max(0.0)).powf(1.0 / 3.0);
self.origin_point = self.wlast_max;
} else {
self.k = 0.0;
self.origin_point = self.cwnd;
}
self.ack_cnt = 1.0;
self.wtcp = self.cwnd
}
let t = (time::get_time().sec as f64)
+ f64::from(time::get_time().nsec) / 1_000_000_000.0
+ self.d_min
- self.epoch_start;
let target = self.origin_point + self.c * ((t - self.k) * (t - self.k) * (t - self.k));
if target > self.cwnd {
self.cnt = self.cwnd / (target - self.cwnd);
} else {
self.cnt = 100.0 * self.cwnd;
}
if self.tcp_friendliness {
self.cubic_tcp_friendliness();
}
}
fn cubic_tcp_friendliness(&mut self) {
self.wtcp += ((3.0 * self.beta) / (2.0 - self.beta)) * (self.ack_cnt / self.cwnd);
self.ack_cnt = 0.0;
if self.wtcp > self.cwnd {
let max_cnt = self.cwnd / (self.wtcp - self.cwnd);
if self.cnt > max_cnt {
self.cnt = max_cnt;
}
}
}
fn cubic_reset(&mut self) {
self.wlast_max = 0.0;
self.epoch_start = -0.1;
self.origin_point = 0.0;
self.d_min = -0.1;
self.wtcp = 0.0;
self.k = 0.0;
self.ack_cnt = 0.0;
}
}
impl GenericCongAvoidAlg for Cubic {
type Flow = Self;
fn name() -> &'static str {
"cubic"
}
fn with_args(_: &clap::ArgMatches) -> Self {
Default::default()
}
fn new_flow(&self, _logger: Option<slog::Logger>, init_cwnd: u32, mss: u32) -> Self::Flow {
Cubic {
pkt_size: mss,
init_cwnd: init_cwnd / mss,
cwnd: f64::from(init_cwnd / mss),
cwnd_cnt: 0.0f64,
tcp_friendliness: true,
beta: 0.3f64,
fast_convergence: true,
c: 0.4f64,
wlast_max: 0.0f64,
epoch_start: -0.1f64,
origin_point: 0.0f64,
d_min: -0.1f64,
wtcp: 0.0f64,
k: 0.0f64,
ack_cnt: 0.0f64,
cnt: 0.0f64,
cubic_rtt: 0.1f64,
}
}
}
impl GenericCongAvoidFlow for Cubic {
fn curr_cwnd(&self) -> u32 {
(self.cwnd * f64::from(self.pkt_size)) as u32
}
fn set_cwnd(&mut self, cwnd: u32) {
self.cwnd = f64::from(cwnd) / f64::from(self.pkt_size);
}
fn increase(&mut self, m: &GenericCongAvoidMeasurements) {
self.cubic_rtt = (f64::from(m.rtt)) * 0.000_001;
let f_rtt = (f64::from(m.rtt)) * 0.000_001;
let no_of_acks = ((f64::from(m.acked)) / (f64::from(self.pkt_size))) as u32;
for _i in 0..no_of_acks {
if self.d_min <= 0.0 || f_rtt < self.d_min {
self.d_min = f_rtt;
}
self.cubic_update();
if self.cwnd_cnt > self.cnt {
self.cwnd += 1.0;
self.cwnd_cnt = 0.0;
} else {
self.cwnd_cnt += 1.0;
}
}
}
fn reduction(&mut self, _m: &GenericCongAvoidMeasurements) {
self.epoch_start = -0.1;
if self.cwnd < self.wlast_max && self.fast_convergence {
self.wlast_max = self.cwnd * ((2.0 - self.beta) / 2.0);
} else {
self.wlast_max = self.cwnd;
}
self.cwnd *= 1.0 - self.beta;
if self.cwnd as u32 <= self.init_cwnd {
self.cwnd = f64::from(self.init_cwnd);
}
}
fn reset(&mut self) {
self.cubic_reset();
}
}

View File

@ -1,601 +0,0 @@
extern crate time;
#[macro_use]
extern crate slog;
extern crate clap;
extern crate portus;
use portus::ipc::Ipc;
use portus::lang::Scope;
use portus::{CongAlg, CongAlgBuilder, Datapath, DatapathInfo, DatapathTrait, Report};
use std::collections::HashMap;
pub mod cubic;
pub mod reno;
//mod bin_helper;
//pub use bin_helper::{make_args, start};
pub const DEFAULT_SS_THRESH: u32 = 0x7fff_ffff;
pub const DEFAULT_SS_THRESH_STR: &'static str = "2147483647";
pub struct GenericCongAvoidMeasurements {
pub acked: u32,
pub was_timeout: bool,
pub sacked: u32,
pub loss: u32,
pub rtt: u32,
pub inflight: u32,
}
#[derive(Debug, Clone, Copy)]
pub enum GenericCongAvoidConfigReport {
Ack,
Rtt,
Interval(time::Duration),
}
#[derive(Debug, Clone, Copy)]
pub enum GenericCongAvoidConfigSS {
Datapath,
Ccp,
}
pub trait GenericCongAvoidFlow {
fn curr_cwnd(&self) -> u32;
fn set_cwnd(&mut self, cwnd: u32);
fn increase(&mut self, m: &GenericCongAvoidMeasurements);
fn reduction(&mut self, m: &GenericCongAvoidMeasurements);
fn reset(&mut self) {}
}
pub trait GenericCongAvoidAlg {
type Flow: GenericCongAvoidFlow;
fn name() -> &'static str;
fn args<'a, 'b>() -> Vec<clap::Arg<'a, 'b>> {
vec![]
}
fn with_args(matches: &clap::ArgMatches) -> Self;
fn new_flow(&self, logger: Option<slog::Logger>, init_cwnd: u32, mss: u32) -> Self::Flow;
}
pub struct Alg<A: GenericCongAvoidAlg> {
pub deficit_timeout: u32,
pub init_cwnd: u32,
pub report_option: GenericCongAvoidConfigReport,
pub ss: GenericCongAvoidConfigSS,
pub ss_thresh: u32,
pub use_compensation: bool,
pub logger: Option<slog::Logger>,
pub alg: A,
}
impl<T: Ipc, A: GenericCongAvoidAlg> CongAlg<T> for Alg<A> {
type Flow = Flow<T, A::Flow>;
fn name() -> &'static str {
A::name()
}
fn datapath_programs(&self) -> HashMap<&'static str, String> {
let mut h = HashMap::default();
h.insert(
"DatapathIntervalProg",
"
(def
(Report
(volatile acked 0)
(volatile sacked 0)
(volatile loss 0)
(volatile timeout false)
(volatile rtt 0)
(volatile inflight 0)
)
(reportTime 0)
)
(when true
(:= Report.inflight Flow.bytes_in_flight)
(:= Report.rtt Flow.rtt_sample_us)
(:= Report.acked (+ Report.acked Ack.bytes_acked))
(:= Report.sacked (+ Report.sacked Ack.packets_misordered))
(:= Report.loss Ack.lost_pkts_sample)
(:= Report.timeout Flow.was_timeout)
(fallthrough)
)
(when (|| Report.timeout (> Report.loss 0))
(report)
(:= Micros 0)
)
(when (> Micros reportTime)
(report)
(:= Micros 0)
)
"
.to_string(),
);
h.insert(
"DatapathIntervalRTTProg",
"
(def (Report
(volatile acked 0)
(volatile sacked 0)
(volatile loss 0)
(volatile timeout false)
(volatile rtt 0)
(volatile inflight 0)
))
(when true
(:= Report.inflight Flow.bytes_in_flight)
(:= Report.rtt Flow.rtt_sample_us)
(:= Report.acked (+ Report.acked Ack.bytes_acked))
(:= Report.sacked (+ Report.sacked Ack.packets_misordered))
(:= Report.loss Ack.lost_pkts_sample)
(:= Report.timeout Flow.was_timeout)
(fallthrough)
)
(when (|| Report.timeout (> Report.loss 0))
(report)
(:= Micros 0)
)
(when (> Micros Flow.rtt_sample_us)
(report)
(:= Micros 0)
)
"
.to_string(),
);
h.insert(
"AckUpdateProg",
"
(def (Report
(volatile acked 0)
(volatile sacked 0)
(volatile loss 0)
(volatile timeout false)
(volatile rtt 0)
(volatile inflight 0)
))
(when true
(:= Report.acked (+ Report.acked Ack.bytes_acked))
(:= Report.sacked (+ Report.sacked Ack.packets_misordered))
(:= Report.loss Ack.lost_pkts_sample)
(:= Report.timeout Flow.was_timeout)
(:= Report.rtt Flow.rtt_sample_us)
(:= Report.inflight Flow.bytes_in_flight)
(report)
)
"
.to_string(),
);
h.insert(
"SSUpdateProg",
"
(def (Report
(volatile acked 0)
(volatile sacked 0)
(volatile loss 0)
(volatile timeout false)
(volatile rtt 0)
(volatile inflight 0)
))
(when true
(:= Report.acked (+ Report.acked Ack.bytes_acked))
(:= Report.sacked (+ Report.sacked Ack.packets_misordered))
(:= Report.loss Ack.lost_pkts_sample)
(:= Report.timeout Flow.was_timeout)
(:= Report.rtt Flow.rtt_sample_us)
(:= Report.inflight Flow.bytes_in_flight)
(:= Cwnd (+ Cwnd Ack.bytes_acked))
(fallthrough)
)
(when (|| Report.timeout (> Report.loss 0))
(report)
)
"
.to_string(),
);
h
}
fn new_flow(&self, control: Datapath<T>, info: DatapathInfo) -> Self::Flow {
let init_cwnd = if self.init_cwnd != 0 {
self.init_cwnd
} else {
info.init_cwnd
};
// hack for now to deal with inconsistent units (init_cwnd should be in bytes, but is in pkts)
// if init_cwnd < info.mss {
// init_cwnd = init_cwnd * info.mss;
// }
let mut s = Flow {
control_channel: control,
logger: self.logger.clone(),
report_option: self.report_option,
sc: Default::default(),
ss_thresh: self.ss_thresh,
rtt: 0,
in_startup: false,
mss: info.mss,
use_compensation: self.use_compensation,
deficit_timeout: self.deficit_timeout,
init_cwnd,
curr_cwnd_reduction: 0,
last_cwnd_reduction: time::now().to_timespec() - time::Duration::milliseconds(500),
alg: self.alg.new_flow(self.logger.clone(), init_cwnd, info.mss),
};
match (self.ss, self.report_option) {
(GenericCongAvoidConfigSS::Datapath, _) => {
s.sc = s.install_ss_update();
s.in_startup = true;
}
(GenericCongAvoidConfigSS::Ccp, GenericCongAvoidConfigReport::Ack) => {
s.sc = s.install_ack_update();
}
(GenericCongAvoidConfigSS::Ccp, GenericCongAvoidConfigReport::Rtt) => {
s.sc = s.install_datapath_interval_rtt();
}
(GenericCongAvoidConfigSS::Ccp, GenericCongAvoidConfigReport::Interval(i)) => {
s.sc = s.install_datapath_interval(i);
}
}
//self.logger.as_ref().map(|log| {
// debug!(log, "setting initial cwnd";
// "curr_cwnd (bytes)" => s.alg.curr_cwnd(),
// "mss" => s.mss,
// );
//});
s.update_cwnd();
// eprintln!("sleeping...");
// std::thread::sleep(std::time::Duration::from_millis(1500));
// eprintln!("awake!");
s
}
}
use clap::Arg;
impl<'a, 'b, A: GenericCongAvoidAlg> CongAlgBuilder<'a, 'b> for Alg<A> {
fn args() -> clap::App<'a, 'b> {
clap::App::new("CCP generic congestion avoidance")
.version("0.4.0")
.author("CCP Project <ccp@csail.mit.edu>")
.about("CCP implementation of a congestion avoidance algorithm")
.arg(Arg::with_name("ipc")
.long("ipc")
.help("Sets the type of ipc to use: (netlink|unix)")
.default_value("unix")
.validator(portus::algs::ipc_valid))
.arg(Arg::with_name("init_cwnd")
.long("init_cwnd")
.help("Sets the initial congestion window, in bytes. Setting 0 will use datapath default.")
.default_value("0"))
.arg(Arg::with_name("ss_thresh")
.long("ss_thresh")
.help("Sets the slow start threshold, in bytes")
.default_value(&DEFAULT_SS_THRESH_STR))
.arg(Arg::with_name("ss_in_fold")
.long("ss_in_fold")
.help("Implement slow start in the datapath"))
.arg(Arg::with_name("report_per_ack")
.long("per_ack")
.help("Specifies that the datapath should send a measurement upon every ACK"))
.arg(Arg::with_name("report_per_interval")
.long("report_interval_ms")
.short("i")
.takes_value(true))
.group(clap::ArgGroup::with_name("interval")
.args(&["report_per_ack", "report_per_interval"])
.required(false))
.arg(Arg::with_name("compensate_update")
.long("compensate_update")
.help("Scale the congestion window update during slow start to compensate for reporting delay"))
.arg(Arg::with_name("deficit_timeout")
.long("deficit_timeout")
.default_value("0")
.help("Number of RTTs to wait after a loss event to allow further CWND reductions. \
Default 0 means CWND deficit counting is enforced strictly with no timeout."))
.args(&A::args())
}
fn with_arg_matches(
args: &clap::ArgMatches,
logger: Option<slog::Logger>,
) -> Result<Self, portus::Error> {
Ok(Self {
ss_thresh: u32::from_str_radix(args.value_of("ss_thresh").unwrap(), 10)?,
init_cwnd: u32::from_str_radix(args.value_of("init_cwnd").unwrap(), 10)?,
report_option: if args.is_present("report_per_ack") {
GenericCongAvoidConfigReport::Ack
} else if args.is_present("report_per_interval") {
GenericCongAvoidConfigReport::Interval(time::Duration::milliseconds(
args.value_of("report_per_interval")
.unwrap()
.parse()
.unwrap(),
))
} else {
GenericCongAvoidConfigReport::Rtt
},
ss: if args.is_present("ss_in_fold") {
GenericCongAvoidConfigSS::Datapath
} else {
GenericCongAvoidConfigSS::Ccp
},
use_compensation: args.is_present("compensate_update"),
deficit_timeout: u32::from_str_radix(args.value_of("deficit_timeout").unwrap(), 10)?,
logger: logger.into(),
alg: A::with_args(args),
})
}
}
pub struct Flow<T: Ipc, A: GenericCongAvoidFlow> {
alg: A,
deficit_timeout: u32,
init_cwnd: u32,
report_option: GenericCongAvoidConfigReport,
ss_thresh: u32,
use_compensation: bool,
control_channel: Datapath<T>,
logger: Option<slog::Logger>,
curr_cwnd_reduction: u32,
last_cwnd_reduction: time::Timespec,
in_startup: bool,
mss: u32,
rtt: u32,
sc: Scope,
}
impl<I: Ipc, A: GenericCongAvoidFlow> portus::Flow for Flow<I, A> {
fn on_report(&mut self, sock_id: u32, m: Report) {
let mut ms = self.get_fields(&m);
self.logger.as_ref().map(|log| {
debug!(log, "got ack";
"acked(pkts)" => ms.acked / self.mss,
"curr_cwnd (pkts)" => self.alg.curr_cwnd() / self.mss,
"inflight (pkts)" => ms.inflight / self.mss,
"loss" => ms.loss,
"ssthresh" => self.ss_thresh,
"rtt" => ms.rtt,
"sid" => sock_id,
);
});
if self.in_startup {
// install new fold
match self.report_option {
GenericCongAvoidConfigReport::Ack => {
self.sc = self.install_ack_update();
}
GenericCongAvoidConfigReport::Rtt => {
self.sc = self.install_datapath_interval_rtt();
}
GenericCongAvoidConfigReport::Interval(i) => {
self.sc = self.install_datapath_interval(i);
}
}
self.alg.set_cwnd(ms.inflight); // * self.mss);
self.in_startup = false;
}
self.rtt = ms.rtt;
if ms.was_timeout {
self.handle_timeout();
return;
}
ms.acked = self.slow_start_increase(ms.acked);
// increase the cwnd corresponding to new in-order cumulative ACKs
self.alg.increase(&ms);
self.maybe_reduce_cwnd(&ms);
if self.curr_cwnd_reduction > 0 {
//self.logger.as_ref().map(|log| {
// debug!(log, "cwnd re";
// "cwnd" => self.alg.curr_cwnd() / self.mss,
// "acked" => ms.acked / self.mss,
// "deficit" => self.curr_cwnd_reduction);
//});
return;
}
self.update_cwnd();
}
}
impl<T: Ipc, A: GenericCongAvoidFlow> Flow<T, A> {
/// Make no updates in the datapath, and send a report after an interval
fn install_datapath_interval(&mut self, interval: time::Duration) -> Scope {
self.control_channel
.set_program(
"DatapathIntervalProg",
Some(&[("reportTime", interval.num_microseconds().unwrap() as u32)][..]),
)
.unwrap()
}
/// Make no updates in the datapath, and send a report after each RTT
fn install_datapath_interval_rtt(&mut self) -> Scope {
self.control_channel
.set_program("DatapathIntervalRTTProg", None)
.unwrap()
}
/// Make no updates in the datapath, but send a report on every ack.
fn install_ack_update(&mut self) -> Scope {
self.control_channel
.set_program("AckUpdateProg", None)
.unwrap()
}
/// Don't update acked, since those acks are already accounted for in slow start.
/// Send a report once there is a drop or timeout.
fn install_ss_update(&mut self) -> Scope {
self.control_channel
.set_program("SSUpdateProg", None)
.unwrap()
}
fn update_cwnd(&self) {
if let Err(e) = self
.control_channel
.update_field(&self.sc, &[("Cwnd", self.alg.curr_cwnd())])
{
self.logger.as_ref().map(|log| {
warn!(log, "Cwnd update error";
"err" => ?e,
);
});
}
}
fn get_fields(&mut self, m: &Report) -> GenericCongAvoidMeasurements {
let sc = &self.sc;
let ack = m
.get_field(&String::from("Report.acked"), sc)
.expect("expected acked field in returned measurement") as u32;
let sack = m
.get_field(&String::from("Report.sacked"), sc)
.expect("expected sacked field in returned measurement") as u32;
let was_timeout =
m.get_field(&String::from("Report.timeout"), sc)
.expect("expected timeout field in returned measurement") as u32;
let inflight =
m.get_field(&String::from("Report.inflight"), sc)
.expect("expected inflight field in returned measurement") as u32;
let loss = m
.get_field(&String::from("Report.loss"), sc)
.expect("expected loss field in returned measurement") as u32;
let rtt = m
.get_field(&String::from("Report.rtt"), sc)
.expect("expected rtt field in returned measurement") as u32;
GenericCongAvoidMeasurements {
acked: ack,
was_timeout: was_timeout == 1,
sacked: sack,
loss,
rtt,
inflight,
}
}
fn handle_timeout(&mut self) {
self.ss_thresh /= 2;
if self.ss_thresh < self.init_cwnd {
self.ss_thresh = self.init_cwnd;
}
self.alg.reset();
self.alg.set_cwnd(self.init_cwnd);
self.curr_cwnd_reduction = 0;
self.logger.as_ref().map(|log| {
warn!(log, "timeout";
"curr_cwnd (pkts)" => self.init_cwnd / self.mss,
"ssthresh" => self.ss_thresh,
);
});
self.update_cwnd();
return;
}
fn maybe_reduce_cwnd(&mut self, m: &GenericCongAvoidMeasurements) {
let old_deficit = self.curr_cwnd_reduction;
if m.loss > 0 || m.sacked > 0 {
if self.deficit_timeout > 0
&& ((time::now().to_timespec() - self.last_cwnd_reduction)
> time::Duration::microseconds(
(f64::from(self.rtt) * self.deficit_timeout as f64) as i64,
))
{
self.curr_cwnd_reduction = 0;
}
// if loss indicator is nonzero
// AND the losses in the lossy cwnd have not yet been accounted for
// OR there is a partial ACK AND cwnd was probing ss_thresh
if m.loss > 0 && self.curr_cwnd_reduction == 0
//|| (m.acked > 0 && self.alg.curr_cwnd() == self.ss_thresh)
{
//self.logger.as_ref().map(|log| {
// info!(log, "reduction";
// "loss" => m.loss,
// "deficit" => self.curr_cwnd_reduction,
// "sacked" => m.sacked,
// "acked" => m.acked,
// "cwnd" => self.alg.curr_cwnd(),
// "ssthresh" => self.ss_thresh,
// );
//});
self.alg.reduction(m);
self.last_cwnd_reduction = time::now().to_timespec();
self.ss_thresh = self.alg.curr_cwnd();
self.update_cwnd();
}
self.curr_cwnd_reduction += m.sacked + m.loss;
} else if m.acked < self.curr_cwnd_reduction {
self.curr_cwnd_reduction -= (m.acked as f32 / self.mss as f32) as u32;
} else {
self.curr_cwnd_reduction = 0;
}
if old_deficit > 0 || self.curr_cwnd_reduction > 0 {
//self.logger.as_ref().map(|log| {
// info!(log, "deficit";
// "old" => old_deficit,
// "new" => self.curr_cwnd_reduction,
// );
//});
}
}
fn slow_start_increase(&mut self, acked: u32) -> u32 {
let mut new_bytes_acked = acked;
if self.alg.curr_cwnd() < self.ss_thresh {
// increase cwnd by 1 per packet, until ssthresh
if self.alg.curr_cwnd() + new_bytes_acked > self.ss_thresh {
new_bytes_acked -= self.ss_thresh - self.alg.curr_cwnd();
self.alg.set_cwnd(self.ss_thresh);
} else {
let curr_cwnd = self.alg.curr_cwnd();
if self.use_compensation {
// use a compensating increase function: deliberately overshoot
// the "correct" update to keep account for lost throughput due to
// infrequent updates. Usually this doesn't matter, but it can when
// the window is increasing exponentially (slow start).
let delta = f64::from(new_bytes_acked) / (2.0_f64).ln();
self.alg.set_cwnd(curr_cwnd + delta as u32);
// let ccp_rtt = (rtt_us + 10_000) as f64;
// let delta = ccp_rtt * ccp_rtt / (rtt_us as f64 * rtt_us as f64);
// self.cwnd += (new_bytes_acked as f64 * delta) as u32;
} else {
self.alg.set_cwnd(curr_cwnd + new_bytes_acked);
}
new_bytes_acked = 0
}
}
new_bytes_acked
}
}

View File

@ -1,55 +0,0 @@
extern crate portus_export;
extern crate slog;
pub use crate::GenericCongAvoidAlg;
pub use crate::GenericCongAvoidFlow;
pub use crate::GenericCongAvoidMeasurements;
#[derive(Default)]
pub struct Reno {
mss: u32,
init_cwnd: f64,
cwnd: f64,
}
impl GenericCongAvoidAlg for Reno {
type Flow = Self;
fn name() -> &'static str {
"reno"
}
fn with_args(_: &clap::ArgMatches) -> Self {
Default::default()
}
fn new_flow(&self, _logger: Option<slog::Logger>, init_cwnd: u32, mss: u32) -> Self::Flow {
Reno {
mss,
init_cwnd: f64::from(init_cwnd),
cwnd: f64::from(init_cwnd),
}
}
}
impl GenericCongAvoidFlow for Reno {
fn curr_cwnd(&self) -> u32 {
self.cwnd as u32
}
fn set_cwnd(&mut self, cwnd: u32) {
self.cwnd = f64::from(cwnd);
}
fn increase(&mut self, m: &GenericCongAvoidMeasurements) {
// increase cwnd by 1 / cwnd per packet
self.cwnd += 3.0 * f64::from(self.mss) * (f64::from(m.acked) / self.cwnd);
}
fn reduction(&mut self, _m: &GenericCongAvoidMeasurements) {
self.cwnd /= 2.0;
if self.cwnd <= self.init_cwnd {
self.cwnd = self.init_cwnd;
}
}
}

View File

@ -1,97 +0,0 @@
// Step 1/2: Add new alg crates here.
//
// NOTE: Must be listed in TARGETS deps
extern crate ccp_const;
extern crate ccp_generic_cong_avoid;
extern crate clap;
extern crate portus;
use libc::c_char;
use portus::ipc::unix::Socket as S;
use portus::ipc::Blocking as B;
use portus::{CongAlg, CongAlgBuilder};
use std::sync::{atomic, Arc};
use std::ffi::CStr;
use std::fs::File;
use std::os::unix::io::FromRawFd;
use ccp_generic_cong_avoid::{cubic::Cubic, reno::Reno};
fn _start(args: String, out: Option<File>, uid: u64, handle: *const atomic::AtomicBool) {
// Parse config arguments
let argv = args.split_whitespace();
let alg_name = argv.clone().next().expect("empty argument string");
let log = out.map_or_else(
|| portus::algs::make_logger(),
|f| portus::algs::make_file_logger(f),
);
let portus_bindaddr = format!("{}/{}", uid.to_string(), "portus");
let backend = S::<B>::new(&portus_bindaddr)
.map(|sk| portus::ipc::BackendBuilder { sock : sk })
.expect("ipc initialization");
let mut available_algs = vec![];
macro_rules! register_alg {
($alg:ident) => {
let reg_name = <$alg::__ccp_alg_export as CongAlg<S<B>>>::name();
available_algs.push(reg_name);
if alg_name == reg_name {
let args = $alg::__ccp_alg_export::args();
let matches = args.get_matches_from(argv);
let alg =
$alg::__ccp_alg_export::with_arg_matches(&matches, Some(log.clone())).unwrap();
portus::run_with_handle::<_, _>(backend, portus::Config { logger : Some(log) }, alg, handle).unwrap();
return;
}
};
($pkg:ident, $base:ident, $alg:ident) => {
let reg_name = <$pkg::$base<$alg> as CongAlg<S<B>>>::name();
available_algs.push(reg_name);
if alg_name == reg_name {
let args = <$pkg::$base<$alg> as CongAlgBuilder<'_, '_>>::args();
let matches = args.get_matches_from(argv);
let alg = <$pkg::$base<$alg> as CongAlgBuilder<'_, '_>>::with_arg_matches(
&matches,
Some(log.clone()),
)
.unwrap();
portus::run_with_handle::<_, _>(backend, portus::Config { logger : Some(log) }, alg, handle).unwrap();
return;
}
};
}
register_alg!(ccp_const);
register_alg!(ccp_generic_cong_avoid, Alg, Reno);
register_alg!(ccp_generic_cong_avoid, Alg, Cubic);
unreachable!("error: algorithm '{}' not found! available algorithms: {:#?}", alg_name, available_algs);
}
#[no_mangle]
pub extern "C" fn ccp_create_handle() -> *const atomic::AtomicBool {
let handle = Arc::new(atomic::AtomicBool::new(true));
println!("{:#?}", handle);
return Arc::into_raw(handle)
}
#[no_mangle]
pub extern "C" fn ccp_spawn(c_args: *const c_char, log_fd: i32, uid: u64, handle: *const atomic::AtomicBool) {
let args = unsafe { CStr::from_ptr(c_args) }
.to_string_lossy()
.into_owned();
let f = unsafe { File::from_raw_fd(log_fd) };
_start(args, Some(f), uid, handle)
}
#[no_mangle]
pub extern "C" fn ccp_kill(handle: *const atomic::AtomicBool) {
let a = unsafe { Arc::from_raw(handle) };
a.store(false, std::sync::atomic::Ordering::SeqCst);
// forget so we don't drop it automatically at the end, calling code has ownership
let _ = Arc::into_raw(a);
}

View File

@ -1,7 +0,0 @@
#include <cstdint>
extern "C" {
bool *ccp_create_handle();
void ccp_spawn(const char* args, uint32_t log_fd, uint64_t uid, bool *handle);
void ccp_kill(bool *handle);
}

View File

@ -1,354 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/server/CCPReader.h>
#include <quic/QuicConstants.h>
#include <quic/common/Timers.h>
#include <quic/congestion_control/QuicCCP.h>
#ifdef CCP_ENABLED
#include <ccp/ccp.h>
#include <ccp/ccp_error.h>
#endif
#include <folly/detail/IPAddress.h>
#define CCP_UNIX_BASE "/ccp/"
#define FROM_CCP "mvfst"
#define TO_CCP "portus"
#define CCP_MAX_MSG_SIZE 32678
namespace quic {
/**
* In order to keep logic simple for callers of CCPReader (eg. QuicServer), this
* implementation internalizes checks for whether or not ccp is enabled. If ccp
* is enabled, the functions are defined as expected. If not, the functions are
* just empty stubs that ignore their args and do nothing (see bottom).
*/
#ifdef CCP_ENABLED
/*
* libccp callbacks
*
* These functions are not called by CCPReader directly. We pass a reference of
* them to libccp so that it has a simple interface to modify state (eg. cwnd)
* within the datapath as necessary.
*/
extern "C" {
/**
* In libccp, all time is relative to process start.
* NOTE: This is a global variable (as opposed to a field in CCPReader), because
* the time functions, such as libccp::now, are stateless and take no
* parameters.
* TODO: in a future version, libccp should pass a pointer to the datapath
* struct, so we can recover the corresponding CCPReader and keep this variable
* as a field there instead.
*/
uint64_t init_time_ns = 0;
void _ccp_set_cwnd(struct ccp_connection* conn, uint32_t cwnd) {
// Look up the CCP cc algorithm instance for this connection
CCP* alg = (CCP*)ccp_get_impl(conn);
alg->setCongestionWindow(cwnd);
}
void _ccp_set_rate_abs(struct ccp_connection* conn, uint32_t rate) {
CCP* alg = (CCP*)ccp_get_impl(conn);
alg->setPacingRate(rate);
}
int _ccp_send_msg(struct ccp_connection* conn, char* msg, int msg_size) {
std::unique_ptr<folly::IOBuf> buf = folly::IOBuf::wrapBuffer(msg, msg_size);
// Since this function is called by libccp, we are not within the CCPReader
// and thus need to look up a reference to it. This is stored within the
// datapath object created in CCPReader::try_initialize.
CCPReader* reader = (CCPReader*)conn->datapath->impl;
auto ret = reader->writeOwnedBuffer(std::move(buf));
if (ret < 0) {
LOG(ERROR) << "ccp_send_msg failed ret=" << ret << " errno=" << errno;
}
return ret;
}
static void _ccp_log(
struct ccp_datapath* /*dp*/,
enum ccp_log_level level,
const char* msg,
int /*msg_size*/) {
LOG(ERROR) << "[libccp." << level << "] " << msg;
}
// The next 3 functions are used to provide libccp with a sense of time.
// It uses these functions to implement timers, so the timers can only be
// as accurate as the notion of time provided here.
uint64_t _ccp_now_usecs() {
struct timespec now;
uint64_t now_ns, now_us;
clock_gettime(CLOCK_MONOTONIC, &now);
now_ns = (1000000000L * now.tv_sec) + now.tv_nsec;
if (init_time_ns == 0) {
init_time_ns = now_ns;
}
now_us = ((now_ns - init_time_ns) / 1000) & 0xffffffff;
return now_us;
}
uint64_t _ccp_since_usecs(uint64_t then) {
return _ccp_now_usecs() - then;
}
uint64_t _ccp_after_usecs(uint64_t usecs) {
return _ccp_now_usecs() + usecs;
}
} // end libccp callbacks
CCPReader::CCPReader() = default;
void CCPReader::try_initialize(
folly::EventBase* evb,
uint64_t ccpId,
uint64_t parentServerId,
uint8_t parentWorkerId) {
evb_ = evb;
ccpId_ = ccpId;
serverId_ = parentServerId;
workerId_ = parentWorkerId;
// Even though it is technically called an Async*UDP*Socket by folly,
// this is really a unix socket!
ccpSocket_ = std::make_unique<folly::AsyncUDPSocket>(evb_);
bind();
// libccp asks us to allocate this struct manually (as opposed to allocating
// it internally as it does for other things), to allow the max connections to
// be configurable. This array is used by libccp to keep track of all the
// connections that are currently active. The index is the connection id.
struct ccp_connection* active_connections = (struct ccp_connection*)calloc(
MAX_CONCURRENT_CONNECTIONS_LIBCCP, sizeof(struct ccp_connection));
if (active_connections == nullptr) {
LOG(ERROR) << "[ccp] failed to allocate per-connection data structure";
return;
}
// Giving libccp a reference to all of the callbacks
ccpDatapath_.set_cwnd = &_ccp_set_cwnd;
ccpDatapath_.set_rate_abs = &_ccp_set_rate_abs;
ccpDatapath_.send_msg = &_ccp_send_msg;
ccpDatapath_.log = &_ccp_log;
ccpDatapath_.now = &_ccp_now_usecs;
ccpDatapath_.since_usecs = &_ccp_since_usecs;
ccpDatapath_.after_usecs = &_ccp_after_usecs;
ccpDatapath_.impl = (void*)this;
ccpDatapath_.ccp_active_connections = active_connections;
ccpDatapath_.max_connections = MAX_CONCURRENT_CONNECTIONS_LIBCCP;
ccpDatapath_.max_programs = MAX_DATAPATH_PROGRAMS_LIBCCP;
ccpDatapath_.fto_us = FALLBACK_TIMEOUT_US_LIBCCP;
// This function registers us (QuicServerWorker+CCPReader) as a "datapath"
// within libccp. Libccp itself is stateless, so all of our state is
// maintained in this struct and thus must be passed to any future calls to
// libccp.
int ret = ccp_init(&ccpDatapath_);
if (ret < 0) {
LOG(ERROR) << "[ccp] ccp_init failed ret=" << ret;
throw std::runtime_error("internal bug: unable to interface with libccp");
}
}
int CCPReader::connect() {
// This message registers us with CCP. CCP ignores messages from
// datapaths that have not yet registered with it. It identifies a datapath
// by the sending address (/ccp/mvfst{id}).
char ready_buf[READY_MSG_SIZE];
int wrote = write_ready_msg(ready_buf, READY_MSG_SIZE, workerId_);
std::unique_ptr<folly::IOBuf> ready_msg =
folly::IOBuf::wrapBuffer(ready_buf, wrote);
int ret = writeOwnedBuffer(std::move(ready_msg));
// Since we start ccp within a separate thread from QuicServer, its possible
// that it hasn't been scheduled yet when we send this message, in which case
// we will get an unable to connet to socket error (111). If this happens,
// we return a failure and expected the caller to wait and retry later.
// If we can't connect after a few tries then something is probably wrong
// with CCP and we should give up.
if (ret < 0) {
LOG(ERROR) << "[ccp] write_ready_msg failed ret=" << ret
<< " errno=" << errno;
} else {
LOG(INFO) << "[ccp] write_ready_msg success";
initialized_ = true;
}
return ret;
}
folly::EventBase* CCPReader::getEventBase() const {
return evb_;
}
/**
* We communicate with CCP via unix domain sockets.
* CCP owns the address /ccp/portus
* Each QuicServerWorker has its own address, distinguished by worker id:
* ccp/mvfst{id}
*/
void CCPReader::bind() {
CHECK(ccpSocket_);
// In order to prevent permission issues in production environments, we use
// "abstract" sockets, which do not actually create an entry in the
// filesystem. An abstract socket is created by prepending a nullbyte to the
// desired path name. FYI, when displayed by system utilities, abstract socket
// names appear with an @ as the first character to denote the null byte.
std::string recvPath(1, 0);
recvPath.append(CCP_UNIX_BASE);
recvPath.append(
std::to_string(ccpId_) + "/" + std::to_string(serverId_) + "/");
recvPath.append(FROM_CCP);
recvPath.append(std::to_string(workerId_));
recvAddr_.setFromPath(recvPath);
ccpSocket_->bind(recvAddr_);
// Again we start the address with a null byte here to make it abstract
// Since these are connectionless sockets, we don't need to connect to this
// address.
std::string sendPath(1, 0);
sendPath.append(CCP_UNIX_BASE);
sendPath.append(std::to_string(ccpId_) + "/");
sendPath.append(TO_CCP);
sendAddr_.setFromPath(sendPath);
}
void CCPReader::start() {
ccpSocket_->resumeRead(this);
}
void CCPReader::pauseRead() {
CHECK(ccpSocket_);
ccpSocket_->pauseRead();
}
void CCPReader::getReadBuffer(void** buf, size_t* len) noexcept {
// TODO should this be initialized once in the constructor and re-used here?
readBuffer_ = folly::IOBuf::create(CCP_MAX_MSG_SIZE);
*buf = readBuffer_->writableData();
*len = CCP_MAX_MSG_SIZE;
}
void CCPReader::onDataAvailable(
const folly::SocketAddress& /*client*/,
size_t len,
bool truncated,
OnDataAvailableParams /*params*/) noexcept {
// TODO if read buffer is re-used, shouldn't move it here
// Move readBuffer_ first to get rid of it immediately so that if we return
// early, we've flushed it.
Buf data = std::move(readBuffer_);
if (truncated || len <= 0) {
// This is an error, drop the packet.
return;
}
// TODO is this needed?
// data->append(len);
char* buf = (char*)data->data();
int ret = ccp_read_msg(&ccpDatapath_, buf, len);
// After a connection ends, we may get 1 more message from ccp about it,
// but we've already removed it from our local list of connections,
// so libccp will return LIBCCP_UNKNOWN_CONNECTION, which we can ignore
if (ret < 0 && ret != LIBCCP_UNKNOWN_CONNECTION) {
LOG(ERROR) << "ccp_read_msg failed ret=" << ret;
}
}
void CCPReader::onReadError(const folly::AsyncSocketException& ex) noexcept {
LOG(ERROR) << "ccpReader onReadError: " << ex.what();
}
void CCPReader::onReadClosed() noexcept {
shutdown();
}
ssize_t CCPReader::writeOwnedBuffer(std::unique_ptr<folly::IOBuf> buf) {
return ccpSocket_->write(sendAddr_, buf);
}
uint8_t CCPReader::getWorkerId() const noexcept {
return workerId_;
}
struct ccp_datapath* CCPReader::getDatapath() noexcept {
return initialized_ ? &ccpDatapath_ : nullptr;
}
void CCPReader::shutdown() {
if (ccpSocket_) {
try {
ccpSocket_->pauseRead();
ccpSocket_->close();
} catch (...) {
}
ccpSocket_ = nullptr;
}
if (ccpDatapath_.ccp_active_connections) {
free(ccpDatapath_.ccp_active_connections);
}
ccp_free(&ccpDatapath_);
}
CCPReader::~CCPReader() {
// We allocated the list of active connections and ccp_datapath struct,
// so we are responsible for freeing them
shutdown();
}
#else // Empty method placeholders for when ccp is not enabled:
CCPReader::CCPReader() = default;
void CCPReader::try_initialize(
folly::EventBase* evb,
uint64_t,
uint64_t,
uint8_t) {
evb_ = evb;
}
int CCPReader::connect() {
return 0;
}
folly::EventBase* CCPReader::getEventBase() const {
return evb_;
}
void CCPReader::bind() {}
void CCPReader::start() {}
void CCPReader::pauseRead() {}
void CCPReader::getReadBuffer(void**, size_t*) noexcept {}
void CCPReader::onDataAvailable(
const folly::SocketAddress&,
size_t,
bool,
OnDataAvailableParams) noexcept {}
void CCPReader::onReadError(const folly::AsyncSocketException&) noexcept {}
void CCPReader::onReadClosed() noexcept {}
ssize_t CCPReader::writeOwnedBuffer(std::unique_ptr<folly::IOBuf>) {
return 0;
}
uint8_t CCPReader::getWorkerId() const noexcept {
return workerId_;
}
void CCPReader::shutdown() {}
CCPReader::~CCPReader() = default;
#endif
} // namespace quic

View File

@ -1,121 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/SocketAddress.h>
#include <folly/io/async/AsyncUDPSocket.h>
#ifdef CCP_ENABLED
#include <ccp/ccp.h>
#endif
/**
* Libccp internally allocates a fixed-size array to manage state for active
* connections. This number defines the size of that array and thus the maximum
* number of connections we expect to be active at any one time. Once exceeded,
* ccp will ignore new connections.
* TODO add error handling for this case
*/
#define MAX_CONCURRENT_CONNECTIONS_LIBCCP 1024
/**
* This is the maximum number of datapath programs a ccp algorithm may
* install. 10 should be sufficient, the average algorithm only needs to
* install 1, but more complicated algorithms may use a few.
*/
#define MAX_DATAPATH_PROGRAMS_LIBCCP 10
/**
*
* This is the fallback timeout length, in microseconds. If libccp hasn't
* received a response from CCP within this amount of time, it enters fallback
* mode, and all future calls to libccp return an error until connection with
* ccp is restored. This logic is handled within QuicCCP. Note that libccp
* checks for this timeout using the time callbacks above, so its ability to
* achieve this value exactly is limited by the resolution of those functions.
*/
#define FALLBACK_TIMEOUT_US_LIBCCP 1000000
namespace quic {
/**
* Each instance of QuicServerWorker is considered a separate "datapath" by CCP
* and thus communicates with CCP independently in order to prevent the need for
* any coordination between workers.
*
* Each worker has its own corresponding instance of CCPReader to handle
* recieving messages from CCP and dispatching them to the correct connection
* (the cc algorithm implementation, QuicCCP, handles sending messages to CCP).
* This CCPReader runs in the same event base as the worker.
*
* The current implementation assumes communication with CCP via unix sockets.
* CCP can be configured to use other IPC mechanisms if necessary.
*
*/
class CCPReader : public folly::AsyncUDPSocket::ReadCallback {
public:
explicit CCPReader();
~CCPReader() override;
// Initialize state (with libccp) and send a ready message to CCP
void try_initialize(
folly::EventBase* evb,
uint64_t ccpId,
uint64_t serverId,
uint8_t workerId);
// Start listening on the socket
void start();
// Send a ready message to CCP
int connect();
// Pause listening on the socket
void pauseRead();
// Standard ReadCallback functions
void getReadBuffer(void** buf, size_t* len) noexcept override;
void onDataAvailable(
const folly::SocketAddress& client,
size_t len,
bool truncated,
OnDataAvailableParams params) noexcept override;
void onReadError(const folly::AsyncSocketException& ex) noexcept override;
void onReadClosed() noexcept override;
// The id of the corresponding worker thread (running in the same evb)
FOLLY_NODISCARD uint8_t getWorkerId() const noexcept;
FOLLY_NODISCARD folly::EventBase* getEventBase() const;
// Send a message to CCP at the unix socket /ccp/portus from our address
// /ccp/mvfst{id}
ssize_t writeOwnedBuffer(std::unique_ptr<folly::IOBuf> buf);
// Get a reference to the corresponding ccp_datapath struct
// (libccp's wrapper around a QuicServerWorker), needed
// for interacting with libccp. This gets passed to CCP alg instances
// by the QuicServerConnectionState object.
#ifdef CCP_ENABLED
struct ccp_datapath* FOLLY_NULLABLE getDatapath() noexcept;
#endif
void shutdown();
private:
// Bind to our unix socket address
void bind();
// Each ccp reader is uniquely identified by its parent
// QuicServer/QuicServerWorker pair
uint8_t workerId_;
folly::SocketAddress sendAddr_;
folly::SocketAddress recvAddr_;
std::unique_ptr<folly::AsyncUDPSocket> ccpSocket_;
folly::EventBase* evb_{nullptr};
std::unique_ptr<folly::IOBuf> readBuffer_;
#ifdef CCP_ENABLED
struct ccp_datapath ccpDatapath_;
#endif
};
} // namespace quic

View File

@ -45,8 +45,6 @@ add_library(
QuicServerPacketRouter.cpp QuicServerPacketRouter.cpp
QuicServerTransport.cpp QuicServerTransport.cpp
QuicServerWorker.cpp QuicServerWorker.cpp
CCPReader.cpp
QuicCcpThreadLauncher.cpp
SlidingWindowRateLimiter.cpp SlidingWindowRateLimiter.cpp
handshake/DefaultAppTokenValidator.cpp handshake/DefaultAppTokenValidator.cpp
handshake/TokenGenerator.cpp handshake/TokenGenerator.cpp
@ -120,18 +118,6 @@ add_dependencies(
mvfst_state_ack_handler mvfst_state_ack_handler
) )
if(DEFINED CCP_ENABLED)
# This target is generated by running cargo, so cmake needs to know where
# to find it. See quic/congestion_control/third_party/ccp/CMakeLists.txt
# When CCP is not enabled, LIBSTARTCCP_LIBRARY is empty, and thus ignored
# during linking.
get_target_property(LIBSTARTCCP_LIBRARY libstartccp LOCATION)
# This target is only defined when CCP_ENABLED, so we can ony add it here.
add_dependencies(mvfst_server libstartccp)
add_dependencies(mvfst_server_state libstartccp)
endif()
target_link_libraries( target_link_libraries(
mvfst_server_state PUBLIC mvfst_server_state PUBLIC
Folly::folly Folly::folly
@ -146,8 +132,6 @@ target_link_libraries(
mvfst_state_ack_handler mvfst_state_ack_handler
mvfst_transport mvfst_transport
mvfst_transport_knobs mvfst_transport_knobs
${LIBCCP_LIBRARY}
${LIBSTARTCCP_LIBRARY}
) )
target_link_libraries( target_link_libraries(
@ -166,8 +150,6 @@ target_link_libraries(
mvfst_state_datagram_handler mvfst_state_datagram_handler
mvfst_transport mvfst_transport
mvfst_transport_knobs mvfst_transport_knobs
${LIBCCP_LIBRARY}
${LIBSTARTCCP_LIBRARY}
) )
target_link_libraries( target_link_libraries(

View File

@ -1,47 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/server/QuicCcpThreadLauncher.h>
namespace quic {
void QuicCcpThreadLauncher::start(
FOLLY_MAYBE_UNUSED const std::string& ccpConfig) {
#ifdef CCP_ENABLED
ccpId_ = folly::Random::secureRand64();
handle_ = ccp_create_handle();
ccpEvb_.getEventBase()->runInEventBaseThread([&, handle = handle_] {
// NOTE second arg is fd of output device
// hardcoded to 2 for stderr
ccp_spawn(ccpConfig.c_str(), 2, ccpId_, handle);
});
#else
VLOG(2) << "WARN: tried to launch ccp, but ccp not enabled";
#endif
}
bool QuicCcpThreadLauncher::hasLaunched() {
return ccpId_ != 0;
}
uint64_t QuicCcpThreadLauncher::getCcpId() {
#ifdef CCP_ENABLED
return ccpId_;
#else
return 0;
#endif
}
void QuicCcpThreadLauncher::stop() {
#ifdef CCP_ENABLED
if (hasLaunched_()) {
ccp_kill(handle_);
}
#else
#endif
}
} // namespace quic

View File

@ -1,38 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#ifdef CCP_ENABLED
#include <quic/congestion_control/third_party/ccp/libstartccp.h>
#endif
#include <folly/Random.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <cstdint>
#include <string>
namespace quic {
class QuicCcpThreadLauncher {
public:
QuicCcpThreadLauncher() {
ccpId_ = 0;
}
void start(const std::string& ccpConfig);
bool hasLaunched();
uint64_t getCcpId();
void stop();
private:
#ifdef CCP_ENABLED
folly::ScopedEventBaseThread ccpEvb_;
#endif
uint64_t ccpId_;
};
} // namespace quic

View File

@ -12,15 +12,11 @@
#include <folly/portability/GFlags.h> #include <folly/portability/GFlags.h>
#include <quic/codec/DefaultConnectionIdAlgo.h> #include <quic/codec/DefaultConnectionIdAlgo.h>
#include <quic/codec/QuicHeaderCodec.h> #include <quic/codec/QuicHeaderCodec.h>
#include <iterator>
#ifdef CCP_ENABLED
#include <quic/congestion_control/third_party/ccp/libstartccp.h>
#endif
#include <quic/server/CCPReader.h>
#include <quic/server/QuicReusePortUDPSocketFactory.h> #include <quic/server/QuicReusePortUDPSocketFactory.h>
#include <quic/server/QuicServerTransport.h> #include <quic/server/QuicServerTransport.h>
#include <quic/server/QuicSharedUDPSocketFactory.h> #include <quic/server/QuicSharedUDPSocketFactory.h>
#include <quic/server/SlidingWindowRateLimiter.h> #include <quic/server/SlidingWindowRateLimiter.h>
#include <iterator>
FOLLY_GFLAGS_DEFINE_bool( FOLLY_GFLAGS_DEFINE_bool(
qs_io_uring_use_async_recv, qs_io_uring_use_async_recv,
@ -235,19 +231,12 @@ void QuicServer::bindWorkersToSocket(
auto numWorkers = evbs.size(); auto numWorkers = evbs.size();
CHECK(!initialized_); CHECK(!initialized_);
boundAddress_ = address; boundAddress_ = address;
auto usingCCP = isUsingCCP();
if (!usingCCP) {
VLOG(2) << "NOT using CCP";
}
auto ccpInitFailed = false;
for (size_t i = 0; i < numWorkers; ++i) { for (size_t i = 0; i < numWorkers; ++i) {
auto workerEvb = evbs[i]; auto workerEvb = evbs[i];
workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait( workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait(
[self = this->shared_from_this(), [self = this->shared_from_this(),
workerEvb, workerEvb,
numWorkers, numWorkers,
usingCCP,
&ccpInitFailed,
processId = processId_, processId = processId_,
idx = i] { idx = i] {
std::lock_guard<std::mutex> guard(self->startMutex_); std::lock_guard<std::mutex> guard(self->startMutex_);
@ -288,28 +277,6 @@ void QuicServer::bindWorkersToSocket(
self->boundAddress_ = worker->getAddress(); self->boundAddress_ = worker->getAddress();
} }
} }
if (usingCCP) {
auto serverId = self->boundAddress_.getIPAddress().hash() |
self->boundAddress_.getPort();
try {
worker->getCcpReader()->try_initialize(
worker->getEventBase(),
self->ccpId_,
serverId,
worker->getWorkerId());
worker->getCcpReader()->connect();
} catch (const folly::AsyncSocketException& ex) {
// probably means the unix socket failed to bind
LOG(ERROR) << "exception while initializing ccp: " << ex.what()
<< "\nshutting down...";
// TODO also update counters
ccpInitFailed = true;
} catch (const std::exception& ex) {
LOG(ERROR) << "exception initializing ccp: " << ex.what()
<< "\nshutting down...";
ccpInitFailed = true;
}
}
if (idx == (numWorkers - 1)) { if (idx == (numWorkers - 1)) {
VLOG(4) << "Initialized all workers in the eventbase"; VLOG(4) << "Initialized all workers in the eventbase";
self->initialized_ = true; self->initialized_ = true;
@ -317,9 +284,6 @@ void QuicServer::bindWorkersToSocket(
} }
}); });
} }
if (usingCCP && ccpInitFailed) {
shutdown();
}
} }
void QuicServer::start() { void QuicServer::start() {
@ -331,14 +295,9 @@ void QuicServer::start() {
workerPtr_.reset( workerPtr_.reset(
worker, [](auto /* worker */, folly::TLPDestructionMode) {}); worker, [](auto /* worker */, folly::TLPDestructionMode) {});
}); });
auto usingCCP = isUsingCCP();
for (auto& worker : workers_) { for (auto& worker : workers_) {
worker->getEventBase()->runInEventBaseThread([&worker, usingCCP] { worker->getEventBase()->runInEventBaseThread(
if (usingCCP) { [&worker] { worker->start(); });
worker->getCcpReader()->start();
}
worker->start();
});
} }
} }
@ -509,13 +468,9 @@ void QuicServer::shutdown(LocalErrorCode error) {
return; return;
} }
shutdown_ = true; shutdown_ = true;
auto usingCCP = isUsingCCP();
for (auto& worker : workers_) { for (auto& worker : workers_) {
worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&] { worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
worker->shutdownAllConnections(error); worker->shutdownAllConnections(error);
if (usingCCP) {
worker->getCcpReader()->shutdown();
}
workerPtr_.reset(); workerPtr_.reset();
}); });
// protecting the erase in map with the mutex since // protecting the erase in map with the mutex since
@ -626,30 +581,6 @@ void QuicServer::setTransportSettings(TransportSettings transportSettings) {
}); });
} }
void QuicServer::setCcpId(uint64_t ccpId) {
ccpId_ = ccpId;
}
bool QuicServer::isUsingCCP() {
auto foundId = ccpId_ != 0;
#ifdef CCP_ENABLED
auto default_is_ccp = transportSettings_.defaultCongestionController ==
CongestionControlType::CCP;
if (foundId && default_is_ccp) {
return true;
} else {
return false;
}
return foundId;
#else
if (foundId) {
LOG(ERROR)
<< "found ccp id, but server was not compiled with ccp support. recompile with -DCCP_ENABLED.";
}
return false;
#endif
}
void QuicServer::rejectNewConnections(std::function<bool()> rejectFn) { void QuicServer::rejectNewConnections(std::function<bool()> rejectFn) {
rejectNewConnections_ = rejectFn; rejectNewConnections_ = rejectFn;
runOnAllWorkers([rejectFn](auto worker) mutable { runOnAllWorkers([rejectFn](auto worker) mutable {

View File

@ -179,14 +179,6 @@ class QuicServer : public QuicServerWorker::WorkerCallback,
*/ */
void setTransportSettings(TransportSettings transportSettings); void setTransportSettings(TransportSettings transportSettings);
/**
* If the calling application wants to use CCP for CC, it's the
* app's responsibility to start an instance of CCP -- this ID
* refers to that unique instance of CCP so we (QuicServer) know
* how to connect to it.
*/
void setCcpId(uint64_t ccpId);
/** /**
* Tells the server to start rejecting any new connection. The parameter * Tells the server to start rejecting any new connection. The parameter
* function is stored and evaluated on each new connection before being * function is stored and evaluated on each new connection before being
@ -409,8 +401,6 @@ class QuicServer : public QuicServerWorker::WorkerCallback,
QuicVersion::QUIC_V1_ALIAS, QuicVersion::QUIC_V1_ALIAS,
QuicVersion::QUIC_DRAFT}}; QuicVersion::QUIC_DRAFT}};
bool isUsingCCP();
std::atomic<bool> shutdown_{true}; std::atomic<bool> shutdown_{true};
std::shared_ptr<const fizz::server::FizzServerContext> ctx_; std::shared_ptr<const fizz::server::FizzServerContext> ctx_;
TransportSettings transportSettings_; TransportSettings transportSettings_;
@ -470,14 +460,6 @@ class QuicServer : public QuicServerWorker::WorkerCallback,
// Options to AsyncUDPSocket::bind, only controls IPV6_ONLY currently. // Options to AsyncUDPSocket::bind, only controls IPV6_ONLY currently.
folly::AsyncUDPSocket::BindOptions bindOptions_; folly::AsyncUDPSocket::BindOptions bindOptions_;
#ifdef CCP_ENABLED
std::unique_ptr<folly::ScopedEventBaseThread> ccpEvb_;
#endif
// Random number to uniquely identify this instance of quic to ccp
// in case there are multiple concurrent instances (e.g. when proxygen is
// migrating connections and there are two concurrent instances of proxygen)
uint64_t ccpId_{0};
// set by getEventBaseBackend if multishot callback is // set by getEventBaseBackend if multishot callback is
// supprted // supprted
bool backendSupportsMultishotCallback_{false}; bool backendSupportsMultishotCallback_{false};

View File

@ -656,12 +656,6 @@ void QuicServerTransport::setBufAccessor(BufAccessor* bufAccessor) {
conn_->bufAccessor = bufAccessor; conn_->bufAccessor = bufAccessor;
} }
#ifdef CCP_ENABLED
void QuicServerTransport::setCcpDatapath(struct ccp_datapath* datapath) {
serverConn_->ccpDatapath = datapath;
}
#endif
const std::shared_ptr<const folly::AsyncTransportCertificate> const std::shared_ptr<const folly::AsyncTransportCertificate>
QuicServerTransport::getPeerCertificate() const { QuicServerTransport::getPeerCertificate() const {
const auto handshakeLayer = serverConn_->serverHandshakeLayer; const auto handshakeLayer = serverConn_->serverHandshakeLayer;
@ -721,16 +715,6 @@ void QuicServerTransport::registerAllTransportKnobParamHandlers() {
if (cctype == server_conn->congestionController->type()) { if (cctype == server_conn->congestionController->type()) {
return; return;
} }
if (cctype == CongestionControlType::CCP) {
bool ccpAvailable = false;
#ifdef CCP_ENABLED
ccpAvailable = server_conn->ccpDatapath != nullptr;
#endif
if (!ccpAvailable) {
LOG(ERROR) << "ccp not enabled on this server";
return;
}
}
server_conn->congestionController = server_conn->congestionController =
server_conn->congestionControllerFactory->makeCongestionController( server_conn->congestionControllerFactory->makeCongestionController(
*server_conn, cctype); *server_conn, cctype);

View File

@ -151,15 +151,6 @@ class QuicServerTransport
virtual void setBufAccessor(BufAccessor* bufAccessor); virtual void setBufAccessor(BufAccessor* bufAccessor);
#ifdef CCP_ENABLED
/*
* This function must be called with an initialized ccp_datapath (via
* libccp:ccp_init) before starting any connections using the CCP congestion
* control algorithm. See further notes on this struct in the header file.
*/
void setCcpDatapath(struct ccp_datapath* datapath);
#endif
const std::shared_ptr<const folly::AsyncTransportCertificate> const std::shared_ptr<const folly::AsyncTransportCertificate>
getPeerCertificate() const override; getPeerCertificate() const override;

View File

@ -26,7 +26,6 @@
#include <quic/congestion_control/Copa.h> #include <quic/congestion_control/Copa.h>
#include <quic/fizz/handshake/FizzRetryIntegrityTagGenerator.h> #include <quic/fizz/handshake/FizzRetryIntegrityTagGenerator.h>
#include <quic/server/AcceptObserver.h> #include <quic/server/AcceptObserver.h>
#include <quic/server/CCPReader.h>
#include <quic/server/QuicServerWorker.h> #include <quic/server/QuicServerWorker.h>
#include <quic/server/handshake/StatelessResetGenerator.h> #include <quic/server/handshake/StatelessResetGenerator.h>
#include <quic/server/handshake/TokenGenerator.h> #include <quic/server/handshake/TokenGenerator.h>
@ -58,7 +57,6 @@ QuicServerWorker::QuicServerWorker(
setEventCallback_(ec), setEventCallback_(ec),
takeoverPktHandler_(this), takeoverPktHandler_(this),
observerList_(this) { observerList_(this) {
ccpReader_ = std::make_unique<CCPReader>();
pending0RttData_.setPruneHook( pending0RttData_.setPruneHook(
[&](auto, auto) { QUIC_STATS(statsCallback_, onZeroRttBufferedPruned); }); [&](auto, auto) { QUIC_STATS(statsCallback_, onZeroRttBufferedPruned); });
} }
@ -647,9 +645,6 @@ QuicServerTransport::Ptr QuicServerWorker::makeTransport(
if (validNewToken) { if (validNewToken) {
trans->verifiedClientAddress(); trans->verifiedClientAddress();
} }
#ifdef CCP_ENABLED
trans->setCcpDatapath(getCcpReader()->getDatapath());
#endif
trans->setCongestionControllerFactory(ccFactory_); trans->setCongestionControllerFactory(ccFactory_);
trans->setTransportStatsCallback(statsCallback_.get()); // ok if nullptr trans->setTransportStatsCallback(statsCallback_.get()); // ok if nullptr
@ -1181,10 +1176,6 @@ void QuicServerWorker::setConnectionIdVersion(
cidVersion_ = cidVersion; cidVersion_ = cidVersion;
} }
CCPReader* QuicServerWorker::getCcpReader() const noexcept {
return ccpReader_.get();
}
void QuicServerWorker::setNewConnectionSocketFactory( void QuicServerWorker::setNewConnectionSocketFactory(
QuicUDPSocketFactory* factory) { QuicUDPSocketFactory* factory) {
socketFactory_ = factory; socketFactory_ = factory;

View File

@ -22,7 +22,6 @@
#include <quic/common/BufAccessor.h> #include <quic/common/BufAccessor.h>
#include <quic/common/Timers.h> #include <quic/common/Timers.h>
#include <quic/congestion_control/CongestionControllerFactory.h> #include <quic/congestion_control/CongestionControllerFactory.h>
#include <quic/server/CCPReader.h>
#include <quic/server/QuicServerPacketRouter.h> #include <quic/server/QuicServerPacketRouter.h>
#include <quic/server/QuicServerTransportFactory.h> #include <quic/server/QuicServerTransportFactory.h>
#include <quic/server/QuicUDPSocketFactory.h> #include <quic/server/QuicUDPSocketFactory.h>
@ -367,13 +366,6 @@ class QuicServerWorker : public folly::AsyncUDPSocket::ReadCallback,
void setUnfinishedHandshakeLimit(std::function<int()> limitFn); void setUnfinishedHandshakeLimit(std::function<int()> limitFn);
/*
* Get a reference to this worker's corresponding CCPReader.
* Each worker has a CCPReader that handles recieving messages from CCP
* and dispatching them to the correct connection.
*/
FOLLY_NODISCARD CCPReader* getCcpReader() const noexcept;
// Read callback // Read callback
void getReadBuffer(void** buf, size_t* len) noexcept override; void getReadBuffer(void** buf, size_t* len) noexcept override;
@ -722,8 +714,6 @@ class QuicServerWorker : public folly::AsyncUDPSocket::ReadCallback,
// List of AcceptObservers // List of AcceptObservers
AcceptObserverList observerList_; AcceptObserverList observerList_;
std::unique_ptr<CCPReader> ccpReader_;
TimePoint largestPacketReceiveTime_{TimePoint::min()}; TimePoint largestPacketReceiveTime_{TimePoint::min()};
}; };

View File

@ -27,10 +27,6 @@
#include <quic/state/SimpleFrameFunctions.h> #include <quic/state/SimpleFrameFunctions.h>
#include <quic/state/StateData.h> #include <quic/state/StateData.h>
#ifdef CCP_ENABLED
#include <ccp/ccp.h>
#endif
#include <folly/ExceptionWrapper.h> #include <folly/ExceptionWrapper.h>
#include <folly/IPAddress.h> #include <folly/IPAddress.h>
#include <folly/Overload.h> #include <folly/Overload.h>
@ -153,16 +149,6 @@ struct QuicServerConnectionState : public QuicConnectionStateBase {
// Sequence number of the last received MAX_PACING_RATE_KNOB_SEQUENCED. // Sequence number of the last received MAX_PACING_RATE_KNOB_SEQUENCED.
folly::Optional<uint64_t> maybeLastMaxPacingRateKnobSeqNum{folly::none}; folly::Optional<uint64_t> maybeLastMaxPacingRateKnobSeqNum{folly::none};
#ifdef CCP_ENABLED
// Pointer to struct that maintains state needed for interacting with libccp.
// Once instance of this struct is created for each instance of
// QuicServerWorker (but lives in the worker's corresponding CCPReader). We
// need to store a pointer to it here, because it needs to be accessible by
// the QuicCCP congestion control algorithm, which only has access to the
// connection's QuicConnectionStateBase.
struct ccp_datapath* ccpDatapath;
#endif
folly::Optional<ConnectionIdData> createAndAddNewSelfConnId() override; folly::Optional<ConnectionIdData> createAndAddNewSelfConnId() override;
QuicServerConnectionState( QuicServerConnectionState(

View File

@ -55,11 +55,6 @@ struct BbrConfig {
folly::Optional<AckFrequencyConfig> ackFrequencyConfig; folly::Optional<AckFrequencyConfig> ackFrequencyConfig;
}; };
struct CcpConfig {
std::string alg_name = "";
std::string alg_args = "";
};
struct DatagramConfig { struct DatagramConfig {
bool enabled{false}; bool enabled{false};
bool framePerPacket{true}; bool framePerPacket{true};
@ -209,8 +204,6 @@ struct TransportSettings {
bool shouldUseRecvmmsgForBatchRecv{false}; bool shouldUseRecvmmsgForBatchRecv{false};
// Config struct for BBR // Config struct for BBR
BbrConfig bbrConfig; BbrConfig bbrConfig;
// Config struct for CCP
CcpConfig ccpConfig;
// A packet is considered loss when a packet that's sent later by at least // A packet is considered loss when a packet that's sent later by at least
// timeReorderingThreshold * RTT is acked by peer. // timeReorderingThreshold * RTT is acked by peer.
DurationRep timeReorderingThreshDividend{ DurationRep timeReorderingThreshDividend{

View File

@ -20,7 +20,6 @@
#include <quic/congestion_control/ServerCongestionControllerFactory.h> #include <quic/congestion_control/ServerCongestionControllerFactory.h>
#include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h> #include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h>
#include <quic/server/AcceptObserver.h> #include <quic/server/AcceptObserver.h>
#include <quic/server/QuicCcpThreadLauncher.h>
#include <quic/server/QuicServer.h> #include <quic/server/QuicServer.h>
#include <quic/server/QuicServerTransport.h> #include <quic/server/QuicServerTransport.h>
#include <quic/server/QuicSharedUDPSocketFactory.h> #include <quic/server/QuicSharedUDPSocketFactory.h>
@ -39,8 +38,7 @@ DEFINE_uint64(
DEFINE_uint64(writes_per_loop, 44, "Amount of socket writes per event loop"); DEFINE_uint64(writes_per_loop, 44, "Amount of socket writes per event loop");
DEFINE_uint64(window, 1024 * 1024, "Flow control window size"); DEFINE_uint64(window, 1024 * 1024, "Flow control window size");
DEFINE_bool(autotune_window, true, "Automatically increase the receive window"); DEFINE_bool(autotune_window, true, "Automatically increase the receive window");
DEFINE_string(congestion, "cubic", "newreno/cubic/bbr/ccp/none"); DEFINE_string(congestion, "cubic", "newreno/cubic/bbr/none");
DEFINE_string(ccp_config, "", "Additional args to pass to ccp");
DEFINE_bool(pacing, false, "Enable pacing"); DEFINE_bool(pacing, false, "Enable pacing");
DEFINE_uint64( DEFINE_uint64(
max_pacing_rate, max_pacing_rate,
@ -138,8 +136,8 @@ class TPerfObserver : public LegacyObserver {
}; };
/** /**
* A helper accpetor observer that installs life cycle observers to * A helper acceptor observer that installs life cycle observers to
* transport upon accpet * transport upon accept
*/ */
class TPerfAcceptObserver : public AcceptObserver { class TPerfAcceptObserver : public AcceptObserver {
public: public:
@ -483,16 +481,6 @@ class TPerfServer {
server_->setCongestionControllerFactory( server_->setCongestionControllerFactory(
std::make_shared<ServerCongestionControllerFactory>()); std::make_shared<ServerCongestionControllerFactory>());
server_->setTransportSettings(settings); server_->setTransportSettings(settings);
if (congestionControlType == quic::CongestionControlType::CCP) {
#ifdef CCP_ENABLED
quicCcpThreadLauncher_.start(FLAGS_ccp_config);
server_->setCcpId(quicCcpThreadLauncher_.getCcpId());
#else
LOG(ERROR)
<< "To use CCP you must recompile tperf and all dependencies with -DCCP_ENABLED";
#endif
}
} }
void start() { void start() {
@ -514,7 +502,6 @@ class TPerfServer {
folly::EventBase eventBase_; folly::EventBase eventBase_;
std::unique_ptr<TPerfAcceptObserver> acceptObserver_; std::unique_ptr<TPerfAcceptObserver> acceptObserver_;
std::shared_ptr<quic::QuicServer> server_; std::shared_ptr<quic::QuicServer> server_;
quic::QuicCcpThreadLauncher quicCcpThreadLauncher_;
}; };
class TPerfClient : public quic::QuicSocket::ConnectionSetupCallback, class TPerfClient : public quic::QuicSocket::ConnectionSetupCallback,