1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-04-18 17:24:03 +03:00
mvfst/quic/server/QuicServer.cpp
Aman Sharma 2f33a3681a Introduce a "BufHelpers" typealias
Summary: This introduces a more generic typealias so that we can, for instance, write `BufHelpers::createCombined` instead of `folly::IOBuf::createCombined`.

Reviewed By: jbeshay

Differential Revision: D73127508

fbshipit-source-id: d585790904efc8e9f92d79cbf766bafe0e84a69f
2025-04-17 11:57:01 -07:00

856 lines
28 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/server/QuicServer.h>
#include <folly/Random.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/portability/GFlags.h>
#include <quic/codec/DefaultConnectionIdAlgo.h>
#include <quic/codec/QuicHeaderCodec.h>
#include <quic/server/QuicReusePortUDPSocketFactory.h>
#include <quic/server/QuicServerTransport.h>
#include <quic/server/QuicSharedUDPSocketFactory.h>
#include <quic/server/SlidingWindowRateLimiter.h>
#include <iterator>
FOLLY_GFLAGS_DEFINE_bool(
qs_io_uring_use_async_recv,
true,
"io_uring backend use async recv");
FOLLY_GFLAGS_DEFINE_int32(
qs_conn_id_version,
0,
"connection id format version quic server used for encoding. only non-zero version is honored");
namespace {
using namespace quic;
// Determine which worker to route to
// This **MUST** be kept in sync with the BPF program (if supplied)
size_t getWorkerToRouteTo(
const RoutingData& routingData,
size_t numWorkers,
ConnectionIdAlgo* connIdAlgo) {
return connIdAlgo->parseConnectionId(routingData.destinationConnId)
->workerId %
numWorkers;
}
constexpr std::string_view kQuicServerNotInitialized =
"Quic server is not initialized. "
"Consider calling waitUntilInitialized() prior to: ";
void checkRunningInThread(const std::thread::id expectedThreadId) {
CHECK(std::this_thread::get_id() == expectedThreadId);
}
// these two functions either delete the IOExecutor if we own it, or otherwise
// no-op
void ownedEvbDeleter(folly::IOExecutor* evb) {
std::default_delete<folly::IOExecutor>()(evb);
}
void unownedEvbDeleter(folly::IOExecutor*) {}
} // namespace
//
namespace quic {
QuicServer::QuicServer(TransportSettings transportSettings)
: transportSettings_(
validateTransportSettings(std::move(transportSettings))),
mainThreadId_(std::this_thread::get_id()) {
#ifdef _WIN32
listenerSocketFactory_ = std::make_unique<QuicReusePortUDPSocketFactory>(
true /* reusePort*/, true /* reuseAddr */);
#else
listenerSocketFactory_ = std::make_unique<QuicReusePortUDPSocketFactory>();
#endif
socketFactory_ = std::make_unique<QuicSharedUDPSocketFactory>();
if (FLAGS_qs_conn_id_version) {
// only set cidVersion_ if gflag is non-zero. otherwise,
// cidVersion_ is V1 by default
cidVersion_ = (ConnectionIdVersion)FLAGS_qs_conn_id_version;
}
}
void QuicServer::setQuicServerTransportFactory(
std::unique_ptr<QuicServerTransportFactory> factory) {
checkRunningInThread(mainThreadId_);
transportFactory_ = std::move(factory);
}
void QuicServer::setQuicUDPSocketFactory(
std::unique_ptr<QuicUDPSocketFactory> factory) {
checkRunningInThread(mainThreadId_);
socketFactory_ = std::move(factory);
}
void QuicServer::setListenerSocketFactory(
std::unique_ptr<QuicUDPSocketFactory> factory) {
checkRunningInThread(mainThreadId_);
listenerSocketFactory_ = std::move(factory);
}
void QuicServer::setCongestionControllerFactory(
std::shared_ptr<CongestionControllerFactory> ccFactory) {
checkRunningInThread(mainThreadId_);
CHECK(!initialized_) << kQuicServerNotInitialized << __func__;
CHECK(ccFactory);
ccFactory_ = std::move(ccFactory);
}
void QuicServer::setRateLimit(
std::function<uint64_t()> count,
std::chrono::seconds window) {
checkRunningInThread(mainThreadId_);
rateLimit_ = folly::make_optional<RateLimit>(std::move(count), window);
}
void QuicServer::setUnfinishedHandshakeLimit(std::function<int()> limitFn) {
checkRunningInThread(mainThreadId_);
unfinishedHandshakeLimitFn_ = std::move(limitFn);
}
void QuicServer::setSupportedVersion(const std::vector<QuicVersion>& versions) {
checkRunningInThread(mainThreadId_);
supportedVersions_ = versions;
}
void QuicServer::setProcessId(ProcessId id) noexcept {
checkRunningInThread(mainThreadId_);
processId_ = id;
}
ProcessId QuicServer::getProcessId() const noexcept {
return processId_;
}
bool QuicServer::isInitialized() const noexcept {
return initialized_;
}
void QuicServer::start(const folly::SocketAddress& address, size_t maxWorkers) {
checkRunningInThread(mainThreadId_);
CHECK(ctx_) << "Must set a TLS context for the Quic server";
CHECK_LE(maxWorkers, std::numeric_limits<uint8_t>::max());
size_t numCpu = std::thread::hardware_concurrency();
if (maxWorkers == 0) {
maxWorkers = numCpu;
}
auto const backendDetails = getEventBaseBackendDetails();
backendSupportsMultishotCallback_ = backendDetails.supportsRecvmsgMultishot;
auto numWorkers = std::min(numCpu, maxWorkers);
// ::start() is the api for QuicServer to construct and own the EventBases the
// QuicServerWorkers are running on
std::vector<MaybeOwnedEvbPtr> ownedEvbs;
ownedEvbs.reserve(numWorkers);
for (size_t i = 0; i < numWorkers; ++i) {
auto scopedEvb = std::make_unique<folly::ScopedEventBaseThread>(
folly::EventBase::Options().setBackendFactory(backendDetails.factory),
nullptr,
"");
ownedEvbs.push_back({scopedEvb.release(), ownedEvbDeleter});
}
initializeImpl(address, std::move(ownedEvbs), true /* useDefaultTransport */);
start();
}
void QuicServer::initialize(
const folly::SocketAddress& address,
const std::vector<folly::EventBase*>& evbs,
bool useDefaultTransport) {
checkRunningInThread(mainThreadId_);
// transform evbs to std::vector<MaybeOwnedEvbPtr> with no ownership
std::vector<MaybeOwnedEvbPtr> unownedEvbs;
unownedEvbs.reserve(evbs.size());
for (auto* evb : evbs) {
unownedEvbs.push_back(
{static_cast<folly::IOExecutor*>(evb), unownedEvbDeleter});
}
initializeImpl(address, std::move(unownedEvbs), useDefaultTransport);
}
void QuicServer::initializeImpl(
const folly::SocketAddress& address,
std::vector<MaybeOwnedEvbPtr> evbs,
bool useDefaultTransport) {
checkRunningInThread(mainThreadId_);
CHECK(!evbs.empty());
CHECK_LE(evbs.size(), std::numeric_limits<uint8_t>::max())
<< "Quic Server does not support more than "
<< std::numeric_limits<uint8_t>::max() << " workers";
CHECK(shutdown_);
shutdown_ = false;
// it the connid algo factory is not set, use default impl
if (!connIdAlgoFactory_) {
connIdAlgoFactory_ = std::make_unique<DefaultConnectionIdAlgoFactory>();
}
connIdAlgo_ = connIdAlgoFactory_->make();
if (!ccFactory_) {
ccFactory_ = std::make_shared<ServerCongestionControllerFactory>();
}
workerEvbs_.swap(evbs);
initializeWorkers(useDefaultTransport);
bindWorkersToSocket(address);
}
void QuicServer::initializeWorkers(bool useDefaultTransport) {
VLOG(4) << "Initializing workers";
CHECK(workers_.empty());
// iterate in the order of insertion in vector
auto workerEvbs = workerEvbs_.rlock();
for (size_t i = 0; i < workerEvbs->size(); ++i) {
auto worker = newWorkerWithoutSocket();
if (useDefaultTransport) {
CHECK(transportFactory_) << "Transport factory is not set";
worker->setTransportFactory(transportFactory_.get());
worker->setFizzContext(ctx_);
}
worker->setWorkerId(i);
workers_.push_back(std::move(worker));
evbToWorkers_.emplace(
(*workerEvbs)[i]->getEventBase(), workers_.back().get());
}
}
std::unique_ptr<QuicServerWorker> QuicServer::newWorkerWithoutSocket() {
QuicServerWorker::SetEventCallback sec;
if (FLAGS_qs_io_uring_use_async_recv) {
sec = backendSupportsMultishotCallback_
? QuicServerWorker::SetEventCallback::RECVMSG_MULTISHOT
: QuicServerWorker::SetEventCallback::RECVMSG;
} else {
sec = QuicServerWorker::SetEventCallback::NONE;
}
auto worker = std::make_unique<QuicServerWorker>(
this->shared_from_this(), transportSettings_, sec);
worker->setNewConnectionSocketFactory(socketFactory_.get());
worker->setSupportedVersions(supportedVersions_);
worker->rejectNewConnections(rejectNewConnections_);
worker->setProcessId(processId_);
worker->setHostId(hostId_);
worker->setConnectionIdVersion(cidVersion_);
if (healthCheckToken_) {
worker->setHealthCheckToken(*healthCheckToken_);
}
if (transportStatsFactory_) {
auto statsCallback = transportStatsFactory_->make();
CHECK(statsCallback);
worker->setTransportStatsCallback(std::move(statsCallback));
}
worker->setConnectionIdAlgo(connIdAlgoFactory_->make());
worker->setCongestionControllerFactory(ccFactory_);
if (rateLimit_) {
worker->setRateLimiter(std::make_unique<SlidingWindowRateLimiter>(
rateLimit_->count, rateLimit_->window));
}
worker->setUnfinishedHandshakeLimit(unfinishedHandshakeLimitFn_);
worker->setTransportSettingsOverrideFn(transportSettingsOverrideFn_);
return worker;
}
void QuicServer::bindWorkersToSocket(const folly::SocketAddress& address) {
auto workerEvbs = workerEvbs_.rlock();
auto numWorkers = workerEvbs->size();
CHECK(!initialized_);
boundAddress_ = address;
for (size_t i = 0; i < numWorkers; ++i) {
auto* workerEvb = (*workerEvbs)[i]->getEventBase();
workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait(
[self = this->shared_from_this(),
workerEvb,
numWorkers,
processId = processId_,
idx = i] {
std::lock_guard<std::mutex> guard(self->startMutex_);
if (self->shutdown_) {
return;
}
auto workerSocket = self->listenerSocketFactory_->make(workerEvb, -1);
auto it = self->evbToWorkers_.find(workerEvb);
CHECK(it != self->evbToWorkers_.end());
auto worker = it->second;
int takeoverOverFd = -1;
if (self->listeningFDs_.size() > idx) {
takeoverOverFd = self->listeningFDs_[idx];
}
worker->setSocketOptions(&self->socketOptions_);
// dup the takenover socket on only one worker and bind the rest
if (takeoverOverFd >= 0) {
workerSocket->setFD(
folly::NetworkSocket::fromFd(::dup(takeoverOverFd)),
// set ownership to OWNS to allow ::close()'ing of of the fd
// when this server goes away
FollyAsyncUDPSocketAlias::FDOwnership::OWNS);
worker->setSocket(std::move(workerSocket));
if (idx == 0) {
self->boundAddress_ = worker->getAddress();
}
VLOG(4) << "Set up dup()'ed fd for address=" << self->boundAddress_
<< " on workerId=" << (int)worker->getWorkerId();
worker->applyAllSocketOptions();
} else {
VLOG(4) << "No valid takenover fd found for address="
<< self->boundAddress_ << ". binding on worker=" << worker
<< " workerId=" << (int)worker->getWorkerId()
<< " processId=" << (int)processId;
worker->setSocket(std::move(workerSocket));
worker->bind(self->boundAddress_, self->bindOptions_);
if (idx == 0) {
self->boundAddress_ = worker->getAddress();
}
}
if (idx == (numWorkers - 1)) {
VLOG(4) << "Initialized all workers in the eventbase";
self->initialized_ = true;
folly::call_once(
self->startDone_, [self]() { self->startDoneBaton_.post(); });
}
});
}
}
void QuicServer::start() {
checkRunningInThread(mainThreadId_);
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
// initialize the thread local ptr to workers
runOnAllWorkers([&](auto worker) mutable {
// pass in no-op deleter to ThreadLocalPtr since the destruction of
// QuicServerWorker is managed by the QuicServer
workerPtr_.reset(
worker, [](auto /* worker */, folly::TLPDestructionMode) {});
});
for (auto& worker : workers_) {
worker->getEventBase()->runInEventBaseThread(
[&worker] { worker->start(); });
}
}
void QuicServer::allowBeingTakenOver(const folly::SocketAddress& addr) {
// synchronously bind workers to takeover handler port.
// This method should not be called from a worker
checkRunningInThread(mainThreadId_);
CHECK(!workers_.empty());
CHECK(!shutdown_);
// this function shouldn't be called from worker's thread
for (auto& worker : workers_) {
DCHECK(
// if the eventbase is not running, it returns true for isInEvbThread()
!worker->getEventBase()->isRunning() ||
!worker->getEventBase()->isInEventBaseThread());
}
// TODO workers_ (vector) is not protected against concurrent modifications
auto numWorkers = workers_.size();
for (size_t i = 0; i < numWorkers; ++i) {
auto workerEvb = workers_[i]->getEventBase();
workerEvb->runInEventBaseThreadAndWait([&] {
std::lock_guard<std::mutex> guard(startMutex_);
CHECK(initialized_);
auto localListenSocket = listenerSocketFactory_->make(workerEvb, -1);
auto it = evbToWorkers_.find(workerEvb);
CHECK(it != evbToWorkers_.end());
auto worker = it->second;
worker->allowBeingTakenOver(std::move(localListenSocket), addr);
});
}
VLOG(4) << "Bind all workers in the eventbase to takeover handler port";
takeoverHandlerInitialized_ = true;
}
folly::SocketAddress QuicServer::overrideTakeoverHandlerAddress(
const folly::SocketAddress& addr) {
checkRunningInThread(mainThreadId_);
// synchronously bind workers to takeover handler port.
// This method should not be called from a worker
CHECK(!workers_.empty());
CHECK(!shutdown_);
CHECK(takeoverHandlerInitialized_) << "TakeoverHanders are not initialized. ";
// this function shouldn't be called from worker's thread
for (auto& worker : workers_) {
DCHECK(
// if the eventbase is not running, it returns true for isInEvbThread()
!worker->getEventBase()->isRunning() ||
!worker->getEventBase()->isInEventBaseThread());
}
folly::SocketAddress boundAddress;
for (auto& worker : workers_) {
worker->getEventBase()->runInEventBaseThreadAndWait([&] {
std::lock_guard<std::mutex> guard(startMutex_);
CHECK(initialized_);
auto workerEvb = worker->getEventBase();
auto localListenSocket = listenerSocketFactory_->make(workerEvb, -1);
boundAddress = worker->overrideTakeoverHandlerAddress(
std::move(localListenSocket), addr);
});
}
return boundAddress;
}
void QuicServer::pauseRead() {
checkRunningInThread(mainThreadId_);
runOnAllWorkersSync([&](auto worker) mutable { worker->pauseRead(); });
}
void QuicServer::routeDataToWorker(
const folly::SocketAddress& client,
RoutingData&& routingData,
NetworkData&& networkData,
Optional<QuicVersion> quicVersion,
folly::EventBase*,
bool isForwardedData) {
// figure out worker idx
if (!initialized_) {
// drop the packet if we are not initialized. This is a janky memory
// barrier.
VLOG(4) << "Dropping data since quic-server is not initialized";
if (workerPtr_) {
QUIC_STATS(
workerPtr_->getTransportStatsCallback(),
onPacketDropped,
PacketDropReason::WORKER_NOT_INITIALIZED);
}
return;
}
if (shutdown_) {
VLOG(4) << "Dropping data since quic server is shutdown";
if (workerPtr_) {
QUIC_STATS(
workerPtr_->getTransportStatsCallback(),
onPacketDropped,
PacketDropReason::SERVER_SHUTDOWN);
}
return;
}
// For initial or zeroRtt packets, pick the worker that kernel / bpf routed to
// Without this, when (bpf / kernel) hash and userspace hash get out of sync
// (e.g. due to shuffling of sockets in the hash ring), it results in
// very high amount of 'misses'
if (routingData.clientChosenDcid && workerPtr_) {
CHECK(workerPtr_->getEventBase()->isInEventBaseThread());
workerPtr_->dispatchPacketData(
client,
std::move(routingData),
std::move(networkData),
quicVersion,
isForwardedData);
return;
}
auto workerToRunOn =
getWorkerToRouteTo(routingData, workers_.size(), connIdAlgo_.get());
auto& worker = workers_[workerToRunOn];
VLOG_IF(4, !worker->getEventBase()->isInEventBaseThread())
<< " Routing to worker in different EVB, to workerId=" << workerToRunOn;
folly::EventBase* workerEvb = worker->getEventBase();
bool isInEvb = workerEvb->isInEventBaseThread();
if (isInEvb) {
worker->dispatchPacketData(
client,
std::move(routingData),
std::move(networkData),
quicVersion,
isForwardedData);
return;
}
worker->getEventBase()->runInEventBaseThread([server =
this->shared_from_this(),
cl = client,
routingData =
std::move(routingData),
w = worker.get(),
buf = std::move(networkData),
isForwarded = isForwardedData,
quicVersion]() mutable {
if (server->shutdown_) {
return;
}
w->dispatchPacketData(
cl, std::move(routingData), std::move(buf), quicVersion, isForwarded);
});
}
void QuicServer::handleWorkerError(LocalErrorCode error) {
shutdown(error);
}
void QuicServer::waitUntilInitialized() {
if (shutdown_ || initialized_) {
return;
}
for (auto& worker : workers_) {
CHECK(!worker->getEventBase()->isInEventBaseThread());
}
// block until all workers have been initialized or shutdown completed
startDoneBaton_.wait();
CHECK(initialized_ || shutdown_);
}
QuicServer::~QuicServer() {
shutdown(LocalErrorCode::SHUTTING_DOWN);
}
void QuicServer::shutdown(LocalErrorCode error) {
if (shutdown_) {
return;
}
shutdown_ = true;
for (auto& worker : workers_) {
worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
worker->shutdownAllConnections(error);
workerPtr_.reset();
});
// protecting the erase in map with the mutex since
// the erase could potentially affect concurrent accesses from other threads
std::lock_guard<std::mutex> guard(startMutex_);
evbToWorkers_.erase(worker->getEventBase());
}
folly::call_once(startDone_, [this]() { this->startDoneBaton_.post(); });
}
bool QuicServer::hasShutdown() const noexcept {
return shutdown_;
}
void QuicServer::runOnAllWorkers(
const std::function<void(QuicServerWorker*)>& func) {
std::lock_guard<std::mutex> guard(startMutex_);
if (shutdown_) {
return;
}
for (auto& worker : workers_) {
worker->getEventBase()->runInEventBaseThread(
[&worker, self = this->shared_from_this(), func]() mutable {
if (self->shutdown_) {
return;
}
func(worker.get());
});
}
}
void QuicServer::runOnAllWorkersSync(
const std::function<void(QuicServerWorker*)>& func) {
std::lock_guard<std::mutex> guard(startMutex_);
if (shutdown_) {
return;
}
for (auto& worker : workers_) {
worker->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[&worker, self = this->shared_from_this(), func]() mutable {
if (self->shutdown_) {
return;
}
func(worker.get());
});
}
}
void QuicServer::setHostId(uint32_t hostId) noexcept {
// TODO(damlaj): T205141168; lift commmented checkRunningInThread check below
// checkRunningInThread(mainThreadId_);
if (hostId_ == hostId) {
LOG(WARNING) << "HostId is already set to " << hostId;
return;
}
hostId_ = hostId;
VLOG(4) << "Setting hostId to " << hostId_ << " for quic server ";
if (initialized_) {
runOnAllWorkersSync(
[hostId](auto worker) mutable { worker->setHostId(hostId); });
}
}
void QuicServer::setConnectionIdVersion(
ConnectionIdVersion cidVersion) noexcept {
checkRunningInThread(mainThreadId_);
CHECK(!initialized_) << kQuicServerNotInitialized << __func__;
if (FLAGS_qs_conn_id_version) {
LOG(ERROR) << "Connection Id Version has been set to " << (int)cidVersion_
<< " by --qs_conn_id_version from the command line.";
} else {
cidVersion_ = cidVersion;
}
}
void QuicServer::setTransportSettingsOverrideFn(
TransportSettingsOverrideFn fn) {
checkRunningInThread(mainThreadId_);
CHECK(!initialized_) << kQuicServerNotInitialized << __func__;
transportSettingsOverrideFn_ = std::move(fn);
}
void QuicServer::setHealthCheckToken(const std::string& healthCheckToken) {
checkRunningInThread(mainThreadId_);
// Make sure the token satisfies the required properties, i.e. it is not a
// valid quic header.
auto parsed = parseHeader(*BufHelpers::copyBuffer(healthCheckToken));
CHECK(!parsed.hasValue());
CHECK_GT(healthCheckToken.size(), kMinHealthCheckTokenSize);
healthCheckToken_ = healthCheckToken;
runOnAllWorkers([healthCheckToken](auto worker) mutable {
worker->setHealthCheckToken(healthCheckToken);
});
}
void QuicServer::setFizzContext(
std::shared_ptr<const fizz::server::FizzServerContext> ctx) {
checkRunningInThread(mainThreadId_);
ctx_ = ctx;
runOnAllWorkers([ctx](auto worker) mutable { worker->setFizzContext(ctx); });
}
void QuicServer::setFizzContext(
folly::EventBase* evb,
std::shared_ptr<const fizz::server::FizzServerContext> ctx) {
CHECK(evb);
CHECK(ctx);
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
std::lock_guard<std::mutex> guard(startMutex_);
if (shutdown_) {
return;
}
auto it = evbToWorkers_.find(evb);
CHECK(it != evbToWorkers_.end());
it->second->setFizzContext(ctx);
});
}
const TransportSettings& QuicServer::getTransportSettings() const noexcept {
return transportSettings_;
}
void QuicServer::rejectNewConnections(std::function<bool()> rejectFn) {
rejectNewConnections_ = rejectFn;
runOnAllWorkers([rejectFn](auto worker) mutable {
worker->rejectNewConnections(rejectFn);
});
}
void QuicServer::blockListedSrcPort(
std::function<bool(uint16_t)> isBlockListedSrcPort) {
checkRunningInThread(mainThreadId_);
isBlockListedSrcPort_ = isBlockListedSrcPort;
runOnAllWorkers([isBlockListedSrcPort](auto worker) mutable {
worker->setIsBlockListedSrcPort(isBlockListedSrcPort);
});
}
void QuicServer::startPacketForwarding(const folly::SocketAddress& destAddr) {
checkRunningInThread(mainThreadId_);
if (initialized_) {
runOnAllWorkersSync([destAddr](auto worker) mutable {
worker->startPacketForwarding(destAddr);
});
}
}
void QuicServer::stopPacketForwarding(std::chrono::milliseconds delay) {
checkRunningInThread(mainThreadId_);
std::lock_guard<std::mutex> guard(startMutex_);
if (!initialized_ || shutdown_) {
return;
}
for (auto& worker : workers_) {
worker->getEventBase()->runInEventBaseThreadAndWait(
[&worker, self = this->shared_from_this(), delay]() mutable {
if (self->shutdown_) {
return;
}
worker->getEventBase()->runAfterDelay(
[&worker, self]() mutable {
if (worker && !self->shutdown_) {
worker->stopPacketForwarding();
}
},
delay.count());
});
}
}
void QuicServer::setTransportStatsCallbackFactory(
std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory) {
checkRunningInThread(mainThreadId_);
CHECK(statsFactory);
transportStatsFactory_ = std::move(statsFactory);
}
void QuicServer::setConnectionIdAlgoFactory(
std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory) {
checkRunningInThread(mainThreadId_);
CHECK(!initialized_);
CHECK(connIdAlgoFactory);
connIdAlgoFactory_ = std::move(connIdAlgoFactory);
}
void QuicServer::addTransportFactory(
folly::EventBase* evb,
QuicServerTransportFactory* acceptor) {
CHECK(evb);
CHECK(acceptor);
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
std::lock_guard<std::mutex> guard(startMutex_);
if (shutdown_) {
return;
}
auto it = evbToWorkers_.find(evb);
if (it != evbToWorkers_.end()) {
it->second->setTransportFactory(acceptor);
} else {
VLOG(3) << "Couldn't find associated worker for the given eventbase";
}
});
}
const folly::SocketAddress& QuicServer::getAddress() const {
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
return boundAddress_;
}
void QuicServer::setListeningFDs(const std::vector<int>& fds) {
checkRunningInThread(mainThreadId_);
std::lock_guard<std::mutex> guard(startMutex_);
listeningFDs_ = fds;
}
int QuicServer::getListeningSocketFD() const {
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
return workers_[0]->getFD();
}
std::vector<int> QuicServer::getAllListeningSocketFDs() const noexcept {
checkRunningInThread(mainThreadId_);
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
std::vector<int> sockets(workers_.size());
for (const auto& worker : workers_) {
if (worker->getFD() != -1) {
CHECK_LT(worker->getWorkerId(), workers_.size());
sockets.at(worker->getWorkerId()) = worker->getFD();
}
}
return sockets;
}
void QuicServer::getAllConnectionsStats(
std::vector<QuicConnectionStats>& stats) {
runOnAllWorkersSync(
[&stats](auto worker) mutable { worker->getAllConnectionsStats(stats); });
}
TakeoverProtocolVersion QuicServer::getTakeoverProtocolVersion()
const noexcept {
return workers_[0]->getTakeoverProtocolVersion();
}
int QuicServer::getTakeoverHandlerSocketFD() const {
checkRunningInThread(mainThreadId_);
CHECK(takeoverHandlerInitialized_) << "TakeoverHanders are not initialized. ";
return workers_[0]->getTakeoverHandlerSocketFD();
}
std::vector<folly::EventBase*> QuicServer::getWorkerEvbs() const noexcept {
checkRunningInThread(mainThreadId_);
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
std::vector<folly::EventBase*> ebvs;
for (const auto& worker : workers_) {
ebvs.push_back(worker->getEventBase());
}
return ebvs;
}
bool QuicServer::addAcceptObserver(
folly::EventBase* evb,
AcceptObserver* observer) {
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
CHECK(evb);
bool success = false;
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
std::lock_guard<std::mutex> guard(startMutex_);
if (shutdown_) {
return;
}
auto it = evbToWorkers_.find(evb);
if (it != evbToWorkers_.end()) {
it->second->addAcceptObserver(observer);
success = true;
} else {
VLOG(3) << "Couldn't find associated worker for the given eventbase, "
<< "unable to add AcceptObserver";
success = false;
}
});
return success;
}
bool QuicServer::removeAcceptObserver(
folly::EventBase* evb,
AcceptObserver* observer) {
CHECK(initialized_) << kQuicServerNotInitialized << __func__;
CHECK(evb);
bool success = false;
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
std::lock_guard<std::mutex> guard(startMutex_);
if (shutdown_) {
return;
}
auto it = evbToWorkers_.find(evb);
if (it != evbToWorkers_.end()) {
success = it->second->removeAcceptObserver(observer);
} else {
VLOG(3) << "Couldn't find associated worker for the given eventbase, "
<< "unable to remove AcceptObserver";
success = false;
}
});
return success;
}
void QuicServer::setSocketOptions(
const folly::SocketOptionMap& options) noexcept {
checkRunningInThread(mainThreadId_);
socketOptions_ = options;
}
/**
* Sets whether the underlying socket should set the IPV6_ONLY socket option
* or not. If set to false, IPv4-mapped IPv6 addresses will be enabled on the
* socket.
*/
void QuicServer::setBindV6Only(bool bindV6Only) {
checkRunningInThread(mainThreadId_);
bindOptions_.bindV6Only = bindV6Only;
}
// STATIC
TransportSettings QuicServer::validateTransportSettings(
TransportSettings transportSettings) {
// setting default stateless reset token if not set
if (!transportSettings.statelessResetTokenSecret) {
std::array<uint8_t, kStatelessResetTokenSecretLength> secret;
folly::Random::secureRandom(secret.data(), secret.size());
transportSettings.statelessResetTokenSecret = secret;
}
return transportSettings;
}
} // namespace quic